工作队列比简单队列在消费者这边多了一个方法。
channel.basicQos(1);公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)
生产者:
1 package com.kf.queueDemo.fairQueue; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.kf.utils.RabbitConnectionUtils; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 /**10 * 公平模式发送消息11 * @author kf12 *13 */14 public class FairProducer {15 16 //队列名称17 private static String QUEUENAME = "SIMPLEQUEUE";18 19 public static void main(String[] args) throws IOException, TimeoutException{20 Connection connection = RabbitConnectionUtils.getConnection();21 22 //创建通道23 Channel channel = connection.createChannel();24 25 //通道里放入队列26 /**27 * 第一个参数是 队列名称28 * 第二个参数指 要不要持久化29 */30 channel.queueDeclare(QUEUENAME, false, false, false, null);31 32 /* //消息体33 String mes = "demo_message汉字";34 35 //发送消息36 *//**37 * 参数为 exchange, routingKey, props, body38 * exchange 交换机39 * routingKey 路由键40 * 41 * body 消息体42 *//*43 channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/44 45 /**46 * 集群环境下,多个消费者情况下。消费者默认采用均摊47 */48 for(int i=1; i<11; i++){49 String mes = "demo_message汉字"+i;50 System.out.println("发送消息"+mes);51 channel.basicPublish("", QUEUENAME, null, mes.getBytes());52 }53 54 55 // System.out.println("发送消息"+mes);56 57 channel.close();58 connection.close();59 }60 61 }
消费者1:
1 package com.kf.queueDemo.fairQueue; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.kf.utils.RabbitConnectionUtils; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.DefaultConsumer;10 import com.rabbitmq.client.Envelope;11 import com.rabbitmq.client.AMQP.BasicProperties;12 /**13 * 公平模式消费者14 * @author kf15 *16 */17 public class FairConsumer1 {18 //队列名称19 private static String QUEUENAME = "SIMPLEQUEUE";20 21 public static void main(String[] args) throws IOException, TimeoutException{22 System.out.println("01开始接收消息");23 Connection connection = RabbitConnectionUtils.getConnection();24 25 //创建通道26 final Channel channel = connection.createChannel();27 28 //通道里放入队列29 /**30 * 第一个参数是 队列名称31 * 第二个参数指 要不要持久化32 */33 channel.queueDeclare(QUEUENAME, false, false, false, null);34 35 //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)36 channel.basicQos(1);37 38 DefaultConsumer consumer = new DefaultConsumer(channel){39 //监听队列40 @Override41 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,42 byte[] body) throws IOException { try {43 Thread.sleep(500);44 } catch (Exception e) {45 }finally {46 System.out.println("------------进入监听---------");47 String s = new String(body, "utf-8");48 System.out.println("获取到的消息是:"+s);49 //手动应答。50 /**51 * 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式52 */53 channel.basicAck(envelope.getDeliveryTag(), false);54 }}55 };56 57 //设置应答模式58 /**59 * 参数: 对列名,是否自动签收,监听的类60 */61 System.out.println("获取消息的方法之前");62 channel.basicConsume(QUEUENAME, false, consumer);63 System.out.println("获取消息的方法之后");64 65 }66 67 68 }
消费者2:
package com.kf.queueDemo.fairQueue;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.kf.utils.RabbitConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;/** * 公平模式消费者 * @author kf * */public class FairConsumer2 { //队列名称 private static String QUEUENAME = "SIMPLEQUEUE"; public static void main(String[] args) throws IOException, TimeoutException{ System.out.println("02开始接收消息"); Connection connection = RabbitConnectionUtils.getConnection(); //创建通道 final Channel channel = connection.createChannel(); //通道里放入队列 /** * 第一个参数是 队列名称 * 第二个参数指 要不要持久化 */ channel.queueDeclare(QUEUENAME, false, false, false, null); //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息) channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel){ //监听队列 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (Exception e) { }finally { System.out.println("------------进入监听---------"); String s = new String(body, "utf-8"); System.out.println("获取到的消息是:"+s); //手动应答。 /** * 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式 */ channel.basicAck(envelope.getDeliveryTag(), false); } } }; //设置应答模式 /** * 参数: 对列名,是否自动签收,监听的类 */ System.out.println("获取消息的方法之前"); channel.basicConsume(QUEUENAME, false, consumer); System.out.println("获取消息的方法之后"); }}