2020年10月22日星期四

RabbitMQ最核心的交换机和队列Exchange、Queue详解

引言

    最近公司项目中,车辆大数据的推送和接收同步都用到了RabbitMQ消息中间件,对于其中最核心的交换机和队列Exchange、Queue的参数配置和使用,再此简单总结一下,供自己和大家一块儿学习!

1.先来介绍RabbitMQ中的成员

  • Producer(生产者): 将消息发送到Exchange
  • Exchange(交换器):将从生产者接收到的消息路由到Queue
  • Queue(队列):存放供消费者消费的消息
  • BindingKey(绑定键):建立Exchange与Queue之间的关系(个人看作是一种规则,也就是Exchange将什么样的消息路由到Queue)
  • RoutingKey(路由键):Producer发送消息与路由键给Exchange,Exchange将判断RoutingKey是否符合BindingKey,如何则将该消息路由到绑定的Queue
  • Consumer(消费者):从Queue中获取消息

下面是各个成员的作用图解

 

 

 

 

 

 

引入依赖

<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version></dependency>

 

2.先来介绍Exchange

这里将着重于介绍Exchange和Queue的各个参数解释

先来看看Exchange中都有哪些属性

  • exchange:名称
  • type:类型
  • durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
  • autoDelete:是否自动删除,如果没有与之绑定的Queue,直接删除
  • internal:是否内置的,如果为true,只能通过Exchange到Exchange
  • arguments:结构化参数

 

 

 下面这个类用于创建一个与RabbitMQ的Connection(连接),该Connection用于创建Channel(信道),Channel是消息读写的通道,也就是我们的操作都会在Channel的基础之上进行

 

 

 2.1先使用最简单的参数构建Exchange
exchangeDeclare(String exchange, String type)

 

 

 进入RabbitMQ可视化界面可以看到,RabbitMQ已经为我们创建了exchange.0,类型为direct

 

 

具体释意

 name      名称 type      类型 Features     特征 Message rate in  消息速率输入 Message rate out  消息速率输出

2.2接下来是三个参数,也就是加上了是否持久化,同时保留先前两个参数的exchange.0,之前我们已经创建了exchange.0,那么我们再创建一次会怎样

  exchangeDeclare(String exchange, String type, boolean durable)

 

运行成功,并没有报错,因为只要你设置的的设置是一样的,那么就不会报错,如果设置的不一样,那么就会报错,后面会进行验证

这里我们发现exchange.2多了一个D标识,这个D是durable也就是持久化,而exchange.0没有持久化,也就是默认非持久化

 

 

 接下来验证这个持久化有什么作用
关闭rabbitmq
rabbitmqctl stop_app
启动rabbitmq
rabbitmqctl start_app
重新进入可视化界面,Exchange就只剩下持久化的了

 

 

 2.3接下来是五个参数的

多了两个参数,autoDelete和arguments
exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)

下面创建了两个Exchange
exchange.3自动删除为false
exchange.4自动删除为true
由于这里是没有绑定Queue的,那么exchange.4将在创建后就被删除掉?

 

 

 执行上面的代码

 

exchange.4还活的好好的,这是因为我们必须在绑定Queue之后再失去绑定才会被删除,否则为什么不直接抛异常,接下来进行验证
下面直接通过可视化工具创建一个名称为queue.4的Queue

 

 

 

 

 英文释义

Name   名称Features  特征Status  状态Ready  是否准备好Unacked  未确认Total   总计incoming  进来的deliver  传送get    得到ack    确认

2.5讲解完Exchange的参数,再来看Queue的参数,就会发现只有一个exclusive未讲
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments

exclusive:是否排他,如果未true,则只在第一次创建它的Connection中有效,当Connection关闭,该Queue也会被删除

在执行完下面代码,查看可视化界面,发现queue中并没有exclusive.queue,因为在connection关闭后,该queue也会自动删除

创建实例

package com.tiandy.illegal.util.mq;import com.alibaba.fastjson.JSONObject;import com.rabbitmq.client.*;import com.tiandy.illegal.bo.CLS_ManageService;import com.tiandy.illegal.bo.CLS_ManageServiceImpl;import com.tiandy.illegal.util.CLS_ILLEGAL_Error;import com.tiandy.illegal.vo.CLS_VO_Message;import com.tiandy.illegal.vo.CLS_VO_Record;import java.io.IOException;import java.util.HashMap;import java.util.Map;import java.util.ResourceBundle;public class RabbitMQSend { //rabbitmq连接 public static Connection connection = null; //rabbitmq通道 public static Channel channel = null; //连接状态标识 public static boolean connectStatus = false; // 配置 static ResourceBundle resourceBundle = ResourceBundle.getBundle("mq/artemisConfig"); // 交换机 exchangeTemp private static String rabbitmq_exchange = resourceBundle.getString("rabbitmq_exchange"); // 队列名 queue_vbs_vehicle_record private static String rabbitmq_queue = resourceBundle.getString("rabbitmq_queue"); // service CLS_ManageService cls_manageService = new CLS_ManageServiceImpl(); static ConnectionFactory factory = null; public void initialize() {  try {   //连接工厂   if (null == factory) {    factory = new ConnectionFactory();    factory= RabbitMQUtil.getRabbitMQConnectionFactory();    // 关闭通道与连接    closeConnection();    connection = factory.newConnection();    channel = connection.createChannel();    // 声明交换机    // channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.DIRECT ,true);    connectStatus = true;   }  } catch (Exception e) {   connectStatus = false;   e.printStackTrace();   // log.error("RabbitMQSend method initialize:" + e.getMessage(), e);  } } //关闭连接 public void closeConnection() {  try {   if (channel != null) {    if (channel.isOpen()) {     channel.close();     channel = null;    }   }  } catch (Exception e) {   //log.error("RabbitMQSend closeChannel error " + e);   e.printStackTrace();  }  try {   if (connection != null) {    if (connection.isOpen()) {     connection.close();     connection = null;    }   }  } catch (Exception e) {   // log.error("RabbitMQSend closeConnection error " + e);   e.printStackTrace();  } }/**  * 监听消息队列,获取数据  */ public void queueDeclareExchange() {  //声明交换机  try {   Map<String, Object> args = new HashMap<String, Object>();   args.put("x-max-length", 100000); // 设置最大存储消息数   // 声明交换机 (交换机参数)   channel.exchangeDeclare(rabbitmq_exchange, BuiltinExchangeType.FANOUT, true);   // 消息持久化 (队列参数)   channel.queueDeclare(rabbitmq_queue, true, false, false, args);   // 交换机与队列绑定   channel.queueBind(rabbitmq_queue, rabbitmq_exchange, "");   // 消费者限制   //channel.basicQos(1);   Consumer consumer = new DefaultConsumer(channel) {    int inRecord=0; // 插入记录数量    @Override    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {     //接收到的消息     String msg = new String(body, "UTF-8");     // 判断数据是否允许接入     int check = checkMessage(msg);     if (check == CLS_ILLEGAL_Error.ERROR_OK) {      // 消息转换至VO      CLS_VO_Message msgVo = cls_manageService.getMessageVo(msg);      // 判断数据,分开处理白车牌数据与其他数据,每次新增一条      int count = cls_manageService.decideData(msgVo);      if(count>0){       inRecord+=count;       System.out.println(" 已消费消息:"+envelope.getDeliveryTag()+" 插入记录数:" + inRecord);      }     }     // 单条消息确认(第几条,是否多条)     channel.basicAck(envelope.getDeliveryTag(),false);    }   };   // 设置消息手动确认 (队列名,是否自动确认,consumer)   channel.basicConsume(rabbitmq_queue, false, consumer);  } catch (IOException e) {   e.printStackTrace();  } } /**  * 方法说明:监测接收信息  *  * @param message  * @return @修改人及日期: @修改描述: @其他:  */ public int checkMessage(String message) {  // TODO 监测数据格式及是否允许接入  int check = 0;  CLS_VO_Message vo_Message = null;  try {   vo_Message = JSONObject.parseObject(message, CLS_VO_Message.class);  } catch (Exception e) {   return CLS_ILLEGAL_Error.ERROR_PARAM;  }  if (vo_Message.getStorage_id() == null || "".equals(vo_Message.getStorage_id())) {   return CLS_ILLEGAL_Error.ERROR_PARAM;  }  if (vo_Message.getCap_pic() == null || vo_Message.getCap_pic().size() == 0) {   return CLS_ILLEGAL_Error.ERROR_PARAM;  }  if (vo_Message.getTotal_info() == null) {   return CLS_ILLEGAL_Error.ERROR_PARAM;  }  CLS_VO_Record total_info = vo_Message.getTotal_info();  if (total_info.getTollgateID() == null || "".equals(total_info.getTollgateID())) {   return CLS_ILLEGAL_Error.ERROR_PARAM;  }  return check; }}

 至此,简单的参数讲解和应用就总结完了!

原文转载:http://www.shaoqun.com/a/481472.html

淘粉8:https://www.ikjzd.com/w/1725

adore:https://www.ikjzd.com/w/2202

跨境通电子商务网站:https://www.ikjzd.com/w/1329


引言最近公司项目中,车辆大数据的推送和接收同步都用到了RabbitMQ消息中间件,对于其中最核心的交换机和队列Exchange、Queue的参数配置和使用,再此简单总结一下,供自己和大家一块儿学习!1.先来介绍RabbitMQ中的成员Producer(生产者):将消息发送到ExchangeExchange(交换器):将从生产者接收到的消息路由到QueueQueue(队列):存放供消费者消费的消息B
巴克莱银行:https://www.ikjzd.com/w/2775
bsci:https://www.ikjzd.com/w/2339
泰姬陵旅游注意事项 :http://tour.shaoqun.com/a/17471.html
台山到海角城古舟岛怎么走?台山到古舟岛自驾车路线?:http://tour.shaoqun.com/a/67591.html
Jungle Scout和AMZScout和ASINspector:https://www.ikjzd.com/tl/7938

没有评论:

发表评论

注意:只有此博客的成员才能发布评论。