博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ之路由
阅读量:5229 次
发布时间:2019-06-14

本文共 4426 字,大约阅读时间需要 14 分钟。

为了实现一个新功能:只订阅消息的一个子集,例如只需要把严重的错误日志信息写入日志文件(存储到磁盘上),但同时仍然把所有的日志信息输出到控制台中。

 

绑定(Bindings)

创建绑定

channel.queue_bind(exchange=exchange_name,                   queue=queue_name)

建立exchange和queue之间的关系,简单理解是这个队列对这个交换器的消息感兴趣。

绑定的时候可以带上一个额外的routing_key参数,为了避免与basic_publish参数混淆,叫做binding key.

创建一个带binding key的绑定

channel.queue_bind(exchange=exchange_name,                   queue=queue_name,                   routing_key='black')

binding key的含义取决于交换器的类型,对于fanout类型会忽略这个值。

 

Driect类型的交换器exchange

使用fanout类型的交换器exchange是广播类型,对于消息的过滤需要使用direct类型

使用direct类型的交换器,交换器将会对binding key和routing key进行精确匹配,从而确定消息该分发到哪个队列。

如上图,可以看到x交换器和两个队列进行绑定,第一个队列使用orange作为binding key,第二个队列有两个绑定,一个使用black,一个使用green。

这样,当routing key为orange的消息发布到交换器,会路由到队列Q1,black和green两个类型路由Q2,其他的所有消息将会被丢弃。

 

多个绑定(Multiple bindings)

多个队列使用相同的binding key是合法的。

可以添加一个x和Q1之间的绑定,也可以再添加一个x和Q2的绑定。这样以来,指定交换direct类型和fanout广播类型功能相同。

 

Emmiting logs

将会发送消息到一个direct exchange,把日志级别作为routing key。

这样负责处理接收的脚本可以选择要处理的日志级别。

 

创建一个direct类型的交换器

channel.exchange_declare(exchange='direct_logs',                         type='direct')

然后发送一条消息:

channel.basic_publish(exchange='direct_logs',                      routing_key=severity,                      body=message)

severity值假定为info,warning,error中的一个

 

订阅(subscribing)

result = channel.queue_declare(exclusive=True)queue_name = result.method.queuefor severity in severities:    channel.queue_bind(exchange='direct_logs',                       queue=queue_name,                       routing_key=severity)

为每一个日志级别创建一个新的绑定。

 

例子

将error信息发送到一个队列,将所有信息发送另一个队列

 

 emit_log

#!/usr/bin/env python#-*- coding:utf8 -*-import sys import pikaimport logginglogging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL)def emit_log():    pika.connection.Parameters.DEFAULT_HOST = 'localhost'    pika.connection.Parameters.DEFAULT_PORT = 5672    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/'     pika.connection.Parameters.DEFAULT_USERNAME = 'guosong'    pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong'    para = pika.connection.Parameters()    connection = pika.BlockingConnection(para)    channel = connection.channel()    #声明一个direct_logs交换器,类型为direct    channel.exchange_declare(exchange='direct_logs',type='direct')    #指定日志级别    serverity = sys.argv[1] if len(sys.argv) >1 else 'info'    message = '.'.join(sys.argv[1:]) or "info:Hello World!"    #发送的时候指定routing_key为空,没有绑定队列到交换器上,消息将会丢失    #对于日志类消息,如果没有消费者监听的话,这些消息就会忽略    channel.basic_publish(exchange='logs',routing_key=serverity,body=message)    #%r也是string类型    print "[x] Sent %r" % (message,)    connection.close()if __name__ == '__main__':                                                                       emit_log()

 接收者

#!/usr/bin/env python#-*- coding:utf8 -*-import sysimport pikaimport logginglogging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL)#回调函数,处理消息                                                                          def callback(ch, method, properties, body):    print " [x] %r " % (body,)def receive_logs():    pika.connection.Parameters.DEFAULT_HOST = 'localhost'    pika.connection.Parameters.DEFAULT_PORT = 5672    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/'    pika.connection.Parameters.DEFAULT_USERNAME = 'guosong'    pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong'    para = pika.connection.Parameters()    connection = pika.BlockingConnection(para)    channel = connection.channel()    #声明一个logs交换器,类型为fanout,不允许发布消息到不存在的交换器    channel.exchange_declare(exchange='direct_logs',type='direct')    #声明一个随机队列,设置exclusive=True,在该consumer退出的时候,对应的队列被删除    result = channel.queue_declare(exclusive=True)    #获取随机队列的名称    queue_name = result.method.queue    serverities = sys.argv[1:]    if not serverities:        print >> sys.stderr, "Usage: %s [info] [warning] [error]" %  sys.argv[0]        sys.exit(1)    for serverity in serverities:         #绑定交换器和队列        channel.queue_bind(exchange='logs',queue=queue_name,routing_key=serverity)    print '[*] Wating for logs.To exit press CTRL+C'    #开始消费消息    channel.basic_consume(callback,queue=queue_name,no_ack=True)    channel.start_consuming()if __name__ == '__main__':    receive_logs()

如果只希望保存warning和error级别的日志到磁盘,只需要打开控制台并输入:

guosong@guosong:~/code/rabbitmq/ch4$ ./receive_logs.py warning error >logs

如果需要所有的话,执行如下命令:

guosong@guosong:~/code/rabbitmq/ch4$ ./receive_logs.py info warning error >logs

  

 

 

 

转载于:https://www.cnblogs.com/gsblog/p/3823544.html

你可能感兴趣的文章
连接Oracle需要jar包和javadoc文档的下载
查看>>
UVA 10976 - Fractions Again?!
查看>>
Dreamweaver cc新版本css单行显示
查看>>
【android】安卓的权限提示及版本相关
查看>>
JavaScript可否多线程? 深入理解JavaScript定时机制
查看>>
IOS基础学习
查看>>
PHP 导出 Excell
查看>>
python之-框架
查看>>
Java基础教程——网络基础知识
查看>>
[Web] 如何实现Web服务器和应用服务器的负载均衡?
查看>>
创建文件夹命令
查看>>
自己到底要的是什么
查看>>
this 指向
查看>>
Kruskal基础最小生成树
查看>>
BZOJ.4819.[SDOI2017]新生舞会(01分数规划 费用流SPFA)
查看>>
ubuntu 14.04 安装搜狗拼音输入法
查看>>
浅谈算法和数据结构: 一 栈和队列
查看>>
[WebMatrix] 如何将SQL Compact 4.0 移转至SQL Server 2008 Express
查看>>
Java内部类详解
查看>>
python-基础
查看>>