博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbit工作队列模式
阅读量:6689 次
发布时间:2019-06-25

本文共 6278 字,大约阅读时间需要 20 分钟。

工作队列比简单队列在消费者这边多了一个方法。

channel.basicQos(1);公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)

生产者:

1 package com.kf.queueDemo.fairQueue; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6 import com.kf.utils.RabbitConnectionUtils; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 /**10  * 公平模式发送消息11  * @author kf12  *13  */14 public class FairProducer {15     16     //队列名称17     private static String QUEUENAME = "SIMPLEQUEUE";18     19     public static void main(String[] args) throws IOException, TimeoutException{20         Connection connection = RabbitConnectionUtils.getConnection();21         22         //创建通道23         Channel channel = connection.createChannel();24         25         //通道里放入队列26         /**27          * 第一个参数是  队列名称28          * 第二个参数指 要不要持久化29          */30         channel.queueDeclare(QUEUENAME, false, false, false, null);31         32 /*        //消息体33         String mes = "demo_message汉字";34         35         //发送消息36         *//**37          * 参数为  exchange, routingKey, props, body38          * exchange   交换机39          * routingKey 路由键40          * 41          * body 消息体42          *//*43         channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/44         45         /**46          * 集群环境下,多个消费者情况下。消费者默认采用均摊47          */48         for(int i=1; i<11; i++){49             String mes = "demo_message汉字"+i;50             System.out.println("发送消息"+mes);51             channel.basicPublish("", QUEUENAME, null, mes.getBytes());52         }53         54         55 //        System.out.println("发送消息"+mes);56         57         channel.close();58         connection.close();59     }60 61 }

消费者1:

1 package com.kf.queueDemo.fairQueue; 2  3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5  6 import com.kf.utils.RabbitConnectionUtils; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.DefaultConsumer;10 import com.rabbitmq.client.Envelope;11 import com.rabbitmq.client.AMQP.BasicProperties;12 /**13  * 公平模式消费者14  * @author kf15  *16  */17 public class FairConsumer1 {18     //队列名称19     private static String QUEUENAME = "SIMPLEQUEUE";20     21     public static void main(String[] args) throws IOException, TimeoutException{22         System.out.println("01开始接收消息");23         Connection connection = RabbitConnectionUtils.getConnection();24         25         //创建通道26         final Channel channel = connection.createChannel();27         28         //通道里放入队列29         /**30          * 第一个参数是  队列名称31          * 第二个参数指 要不要持久化32          */33         channel.queueDeclare(QUEUENAME, false, false, false, null);34         35         //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)36         channel.basicQos(1);37         38         DefaultConsumer consumer = new DefaultConsumer(channel){39             //监听队列40                 @Override41                 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,42                         byte[] body) throws IOException {
try {43 Thread.sleep(500);44 } catch (Exception e) {45 }finally {46 System.out.println("------------进入监听---------");47 String s = new String(body, "utf-8");48 System.out.println("获取到的消息是:"+s);49 //手动应答。50 /**51 * 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式52 */53 channel.basicAck(envelope.getDeliveryTag(), false);54 }}55 };56 57 //设置应答模式58 /**59 * 参数: 对列名,是否自动签收,监听的类60 */61 System.out.println("获取消息的方法之前");62 channel.basicConsume(QUEUENAME, false, consumer);63 System.out.println("获取消息的方法之后");64 65 }66 67 68 }

消费者2:

package com.kf.queueDemo.fairQueue;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.kf.utils.RabbitConnectionUtils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;/** * 公平模式消费者 * @author kf * */public class FairConsumer2 {    //队列名称    private static String QUEUENAME = "SIMPLEQUEUE";        public static void main(String[] args) throws IOException, TimeoutException{        System.out.println("02开始接收消息");        Connection connection = RabbitConnectionUtils.getConnection();                //创建通道        final Channel channel = connection.createChannel();                //通道里放入队列        /**         * 第一个参数是  队列名称         * 第二个参数指 要不要持久化         */        channel.queueDeclare(QUEUENAME, false, false, false, null);                //公平队列消费(参数设置为1,表示消费者消费完一条才会去接受再次发来的消息)                channel.basicQos(1);                DefaultConsumer consumer = new DefaultConsumer(channel){            //监听队列                @Override                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,                        byte[] body) throws IOException {                    try {                        Thread.sleep(1000);                    } catch (Exception e) {                    }finally {                        System.out.println("------------进入监听---------");                        String s = new String(body, "utf-8");                        System.out.println("获取到的消息是:"+s);                        //手动应答。                        /**                         * 当  channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时  是手动应答模式                         */                        channel.basicAck(envelope.getDeliveryTag(), false);                    }                                    }        };                //设置应答模式        /**         * 参数:  对列名,是否自动签收,监听的类         */        System.out.println("获取消息的方法之前");        channel.basicConsume(QUEUENAME, false, consumer);        System.out.println("获取消息的方法之后");            }}

 

转载于:https://www.cnblogs.com/fuguang/p/10660554.html

你可能感兴趣的文章
Android RecyclerView利用Glide加载大量图片into(Target)导致OOM异常
查看>>
UGUI表情系统解决方案
查看>>
ubuntu 下执行定时任务
查看>>
将td中文字过长的部分变成省略号显示的小技巧
查看>>
Cesium随笔(1)部署自己的项目 【转】
查看>>
.NET 程序集单元测试工具 SmokeTest 应用指南
查看>>
HTTP Health Checks
查看>>
为什么正态分布如此普遍
查看>>
jQuery事件
查看>>
BBS论坛(三十)
查看>>
通过PMP考试
查看>>
轻松看懂Java字节码
查看>>
2011年总结以及2012的展望
查看>>
AE TIN的切割
查看>>
ASP.NET图片上传,删除
查看>>
Visual Studio 2010 创建的WCF服务 第一个应用
查看>>
redis 下载启动,设置、查询超时时间
查看>>
WinForm构造函数的作用
查看>>
2016第42周五
查看>>
centos7 取消自动锁屏
查看>>