引言
最近公司项目中,车辆大数据的推送和接收同步都用到了RabbitMQ消息中间件,对于其中最核心的交换机和队列Exchange、Queue的参数配置和使用,再此简单总结一下,供自己和大家一块儿学习!
1.先来介绍RabbitMQ中的成员
- Producer(生产者): 将消息发送到Exchange
- Exchange(交换器):将从生产者接收到的消息路由到Queue
- Queue(队列):存放供消费者消费的消息
- BindingKey(绑定键):建立Exchange与Queue之间的关系(个人看作是一种规则,也就是Exchange将什么样的消息路由到Queue)
- RoutingKey(路由键):Producer发送消息与路由键给Exchange,Exchange将判断RoutingKey是否符合BindingKey,如何则将该消息路由到绑定的Queue
- Consumer(消费者):从Queue中获取消息
下面是各个成员的作用图解

引入依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version></dependency>
2.先来介绍Exchange
这里将着重于介绍Exchange和Queue的各个参数解释
先来看看Exchange中都有哪些属性
- exchange:名称
- type:类型
- durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
- autoDelete:是否自动删除,如果没有与之绑定的Queue,直接删除
- internal:是否内置的,如果为true,只能通过Exchange到Exchange
- arguments:结构化参数

下面这个类用于创建一个与RabbitMQ的Connection(连接),该Connection用于创建Channel(信道),Channel是消息读写的通道,也就是我们的操作都会在Channel的基础之上进行

2.1先使用最简单的参数构建Exchange
exchangeDeclare(String exchange, String type)

进入RabbitMQ可视化界面可以看到,RabbitMQ已经为我们创建了exchange.0,类型为direct

具体释意
name 名称 type 类型 Features 特征 Message rate in 消息速率输入 Message rate out 消息速率输出
2.2接下来是三个参数,也就是加上了是否持久化,同时保留先前两个参数的exchange.0,之前我们已经创建了exchange.0,那么我们再创建一次会怎样
exchangeDeclare(String exchange, String type, boolean durable)

运行成功,并没有报错,因为只要你设置的的设置是一样的,那么就不会报错,如果设置的不一样,那么就会报错,后面会进行验证
这里我们发现exchange.2多了一个D标识,这个D是durable也就是持久化,而exchange.0没有持久化,也就是默认非持久化

接下来验证这个持久化有什么作用
关闭rabbitmq
rabbitmqctl stop_app
启动rabbitmq
rabbitmqctl start_app
重新进入可视化界面,Exchange就只剩下持久化的了

2.3接下来是五个参数的
多了两个参数,autoDelete和arguments
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
下面创建了两个Exchange
exchange.3自动删除为false
exchange.4自动删除为true
由于这里是没有绑定Queue的,那么exchange.4将在创建后就被删除掉?

执行上面的代码

exchange.4还活的好好的,这是因为我们必须在绑定Queue之后再失去绑定才会被删除,否则为什么不直接抛异常,接下来进行验证
下面直接通过可视化工具创建一个名称为queue.4的Queue

英文释义
Name 名称Features 特征Status 状态Ready 是否准备好Unacked 未确认Total 总计incoming 进来的deliver 传送get 得到ack 确认
2.5讲解完Exchange的参数,再来看Queue的参数,就会发现只有一个exclusive未讲
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments
exclusive:是否排他,如果未true,则只在第一次创建它的Connection中有效,当Connection关闭,该Queue也会被删除
在执行完下面代码,查看可视化界面,发现queue中并没有exclusive.queue,因为在connection关闭后,该queue也会自动删除
创建实例
package com.tiandy.illegal.util.mq;import com.alibaba.fastjson.JSONObject;import com.rabbitmq.client.*;import com.tiandy.illegal.bo.CLS_ManageService;import com.tiandy.illegal.bo.CLS_ManageServiceImpl;import com.tiandy.illegal.util.CLS_ILLEGAL_Error;import com.tiandy.illegal.vo.CLS_VO_Message;import com.tiandy.illegal.vo.CLS_VO_Record;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.ResourceBundle;public class RabbitMQSend { //rabbitmq连接 public static Connection connection = null; //rabbitmq通道 public static Channel channel = null; //连接状态标识 public static boolean connectStatus = false; // 配置 static ResourceBundle resourceBundle = ResourceBundle.getBundle("mq/artemisConfig"); // 交换机 exchangeTemp private static String rabbitmq_exchange = resourceBundle.getString("rabbitmq_exchange"); // 队列名 queue_vbs_vehicle_record private static String rabbitmq_queue = resourceBundle.getString("rabbitmq_queue"); // service CLS_ManageService cls_manageService = new CLS_ManageServiceImpl(); static ConnectionFactory factory = null; public void initialize() { try { //连接工厂 if (null == factory) { factory = new ConnectionFactory(); factory= RabbitMQUtil.getRabbitMQConnectionFactory(); // 关闭通道与连接 closeConnection(); connection = factory.newConnection(); channel = connection.createChannel(); // 声明交换机 // channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.DIRECT ,true); connectStatus = true; } } catch (Exception e) { connectStatus = false; e.printStackTrace(); // log.error("RabbitMQSend method initialize:" + e.getMessage(), e); } } //关闭连接 public void closeConnection() { try { if (channel != null) { if (channel.isOpen()) { channel.close(); channel = null; } } } catch (Exception e) { //log.error("RabbitMQSend closeChannel error " + e); e.printStackTrace(); } try { if (connection != null) { if (connection.isOpen()) { connection.close(); connection = null; } } } catch (Exception e) { // log.error("RabbitMQSend closeConnection error " + e); e.printStackTrace(); } }/** * 监听消息队列,获取数据 */ public void queueDeclareExchange() { //声明交换机 try { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-max-length", 100000); // 设置最大存储消息数 // 声明交换机 (交换机参数) channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.FANOUT, true); // 消息持久化 (队列参数) channel.queueDeclare(rabbitmq_queue, true, false, false, args); // 交换机与队列绑定 channel.queueBind(rabbitmq_queue, rabbitmq_exchange, ""); // 消费者限制 //channel.basicQos(1); Consumer consumer = new DefaultConsumer(channel) { int inRecord=0; // 插入记录数量 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //接收到的消息 String msg = new String(body, "UTF-8"); // 判断数据是否允许接入 int check = checkMessage(msg); if (check == CLS_ILLEGAL_Error.ERROR_OK) { // 消息转换至VO CLS_VO_Message msgVo = cls_manageService.getMessageVo(msg); // 判断数据,分开处理白车牌数据与其他数据,每次新增一条 int count = cls_manageService.decideData(msgVo); if(count>0){ inRecord+=count; System.out.println(" 已消费消息:"+envelope.getDeliveryTag()+" 插入记录数:" + inRecord); } } // 单条消息确认(第几条,是否多条) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 设置消息手动确认 (队列名,是否自动确认,consumer) channel.basicConsume(rabbitmq_queue, false, consumer); } catch (IOException e) { e.printStackTrace(); } } /** * 方法说明:监测接收信息 * * @param message * @return @修改人及日期: @修改描述: @其他: */ public int checkMessage(String message) { // TODO 监测数据格式及是否允许接入 int check = 0; CLS_VO_Message vo_Message = null; try { vo_Message = JSONObject.parseObject(message, CLS_VO_Message.class); } catch (Exception e) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getStorage_id() == null || "".equals(vo_Message.getStorage_id())) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getCap_pic() == null || vo_Message.getCap_pic().size() == 0) { return CLS_ILLEGAL_Error.ERROR_PARAM; } if (vo_Message.getTotal_info() == null) { return CLS_ILLEGAL_Error.ERROR_PARAM; } CLS_VO_Record total_info = vo_Message.getTotal_info(); if (total_info.getTollgateID() == null || "".equals(total_info.getTollgateID())) { return CLS_ILLEGAL_Error.ERROR_PARAM; } return check; }}至此,简单的参数讲解和应用就总结完了!
原文转载:http://www.shaoqun.com/a/481472.html
淘粉8:https://www.ikjzd.com/w/1725
adore:https://www.ikjzd.com/w/2202
跨境通电子商务网站:https://www.ikjzd.com/w/1329
引言最近公司项目中,车辆大数据的推送和接收同步都用到了RabbitMQ消息中间件,对于其中最核心的交换机和队列Exchange、Queue的参数配置和使用,再此简单总结一下,供自己和大家一块儿学习!1.先来介绍RabbitMQ中的成员Producer(生产者):将消息发送到ExchangeExchange(交换器):将从生产者接收到的消息路由到QueueQueue(队列):存放供消费者消费的消息B
巴克莱银行:https://www.ikjzd.com/w/2775
bsci:https://www.ikjzd.com/w/2339
泰姬陵旅游注意事项 :http://tour.shaoqun.com/a/17471.html
台山到海角城古舟岛怎么走?台山到古舟岛自驾车路线?:http://tour.shaoqun.com/a/67591.html
Jungle Scout和AMZScout和ASINspector:https://www.ikjzd.com/tl/7938
没有评论:
发表评论
注意:只有此博客的成员才能发布评论。