为了实现一个新功能:只订阅消息的一个子集,例如只需要把严重的错误日志信息写入日志文件(存储到磁盘上),但同时仍然把所有的日志信息输出到控制台中。
绑定(Bindings)
创建绑定
channel.queue_bind(exchange=exchange_name, queue=queue_name)
建立exchange和queue之间的关系,简单理解是这个队列对这个交换器的消息感兴趣。
绑定的时候可以带上一个额外的routing_key参数,为了避免与basic_publish参数混淆,叫做binding key.
创建一个带binding key的绑定
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
binding key的含义取决于交换器的类型,对于fanout类型会忽略这个值。
Driect类型的交换器exchange
使用fanout类型的交换器exchange是广播类型,对于消息的过滤需要使用direct类型
使用direct类型的交换器,交换器将会对binding key和routing key进行精确匹配,从而确定消息该分发到哪个队列。
如上图,可以看到x交换器和两个队列进行绑定,第一个队列使用orange作为binding key,第二个队列有两个绑定,一个使用black,一个使用green。
这样,当routing key为orange的消息发布到交换器,会路由到队列Q1,black和green两个类型路由Q2,其他的所有消息将会被丢弃。
多个绑定(Multiple bindings)
多个队列使用相同的binding key是合法的。
可以添加一个x和Q1之间的绑定,也可以再添加一个x和Q2的绑定。这样以来,指定交换direct类型和fanout广播类型功能相同。
Emmiting logs
将会发送消息到一个direct exchange,把日志级别作为routing key。
这样负责处理接收的脚本可以选择要处理的日志级别。
创建一个direct类型的交换器
channel.exchange_declare(exchange='direct_logs', type='direct')
然后发送一条消息:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
severity值假定为info,warning,error中的一个
订阅(subscribing)
result = channel.queue_declare(exclusive=True)queue_name = result.method.queuefor severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
为每一个日志级别创建一个新的绑定。
例子
将error信息发送到一个队列,将所有信息发送另一个队列
emit_log
#!/usr/bin/env python#-*- coding:utf8 -*-import sys import pikaimport logginglogging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL)def emit_log(): pika.connection.Parameters.DEFAULT_HOST = 'localhost' pika.connection.Parameters.DEFAULT_PORT = 5672 pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/' pika.connection.Parameters.DEFAULT_USERNAME = 'guosong' pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong' para = pika.connection.Parameters() connection = pika.BlockingConnection(para) channel = connection.channel() #声明一个direct_logs交换器,类型为direct channel.exchange_declare(exchange='direct_logs',type='direct') #指定日志级别 serverity = sys.argv[1] if len(sys.argv) >1 else 'info' message = '.'.join(sys.argv[1:]) or "info:Hello World!" #发送的时候指定routing_key为空,没有绑定队列到交换器上,消息将会丢失 #对于日志类消息,如果没有消费者监听的话,这些消息就会忽略 channel.basic_publish(exchange='logs',routing_key=serverity,body=message) #%r也是string类型 print "[x] Sent %r" % (message,) connection.close()if __name__ == '__main__': emit_log()
接收者
#!/usr/bin/env python#-*- coding:utf8 -*-import sysimport pikaimport logginglogging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL)#回调函数,处理消息 def callback(ch, method, properties, body): print " [x] %r " % (body,)def receive_logs(): pika.connection.Parameters.DEFAULT_HOST = 'localhost' pika.connection.Parameters.DEFAULT_PORT = 5672 pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/' pika.connection.Parameters.DEFAULT_USERNAME = 'guosong' pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong' para = pika.connection.Parameters() connection = pika.BlockingConnection(para) channel = connection.channel() #声明一个logs交换器,类型为fanout,不允许发布消息到不存在的交换器 channel.exchange_declare(exchange='direct_logs',type='direct') #声明一个随机队列,设置exclusive=True,在该consumer退出的时候,对应的队列被删除 result = channel.queue_declare(exclusive=True) #获取随机队列的名称 queue_name = result.method.queue serverities = sys.argv[1:] if not serverities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % sys.argv[0] sys.exit(1) for serverity in serverities: #绑定交换器和队列 channel.queue_bind(exchange='logs',queue=queue_name,routing_key=serverity) print '[*] Wating for logs.To exit press CTRL+C' #开始消费消息 channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()if __name__ == '__main__': receive_logs()
如果只希望保存warning和error级别的日志到磁盘,只需要打开控制台并输入:
guosong@guosong:~/code/rabbitmq/ch4$ ./receive_logs.py warning error >logs
如果需要所有的话,执行如下命令:
guosong@guosong:~/code/rabbitmq/ch4$ ./receive_logs.py info warning error >logs