`
anzhsoft
  • 浏览: 23458 次
  • 性别: Icon_minigender_1
  • 来自: 天津
文章分类
社区版块
存档分类
最新评论

RabbitMQ消息队列(六):使用主题进行消息分发

 
阅读更多

在上篇文章RabbitMQ消息队列(五):Routing 消息路由 中,我们实现了一个简单的日志系统。Consumer可以监听不同severity的log。但是,这也是它之所以叫做简单日志系统的原因,因为是仅仅能够通过severity设定。不支持更多的标准。

比如syslog unix的日志工具,它可以通过severity (info/warn/crit...) 和模块(auth/cron/kern...)。这可能更是我们想要的:我们可以仅仅需要cron模块的log。

为了实现类似的功能,我们需要用到topic exchange。


1. Topic exchange

对于Message的routing_key是有限制的,不能使任意的。格式是以点号“."分割的字符表。比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。

对于routing_key,有两个特殊字符(在正则表达式里叫元字符):

  • * (星号) 代表任意 一个单词
  • # (hash) 0个或者多个单词

请看下面一个例子:

Producer发送消息时需要设置routing_key,routing_key包含三个单词和两个点号。第一个key是描述了celerity(灵巧,敏捷),第二个是colour(色彩),第三个是species(物种):"<celerity>.<colour>.<species>"。

在这里我们创建了两个绑定: Q1 的binding key 是"*.orange.*"; Q2 是 "*.*.rabbit" 和 "lazy.#":
  • Q1 感兴趣所有orange颜色的动物
  • Q2 感兴趣所有的rabbits和所有的lazy的

比如routing_key是 "quick.orange.rabbit"将会发送到Q1和Q2中。消息"lazy.orange.elephant" 也会发送到Q1和Q2。但是"quick.orange.fox" 会发送到Q1;"lazy.brown.fox"会发送到Q2。"lazy.pink.rabbit" 也会发送到Q2,但是尽管两个routing_key都匹配,它也只是发送一次。"quick.brown.fox" 会被丢弃。

如果发送的单词不是3个呢? 答案要看情况,因为#是可以匹配0个或任意个单词。比如"orange" or "quick.orange.male.rabbit",它们会被丢弃。如果是lazy那么就会进入Q2。类似的还有 "lazy.orange.male.rabbit",尽管它包含四个单词。

Topic exchange和其他exchange

由于有"*" (star) and "#" (hash), Topic exchange 非常强大并且可以转化为其他的exchange:

如果binding_key 是 "#" - 它会接收所有的Message,不管routing_key是什么,就像是fanout exchange。

如果 "*" (star) and "#" (hash) 没有被使用,那么topic exchange就变成了direct exchange。

2. 代码实现

现在我们要refine我们上篇的日志系统。routing keys 有两个部分: "<facility>.<severity>"。

The code for emit_log_topic.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()

The code for receive_logs_topic.py:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

3. 运行和结果

接收所有的log:

python receive_logs_topic.py "#"
接收所有kern facility的log:

python receive_logs_topic.py "kern.*"
仅仅接收critical的log:

python receive_logs_topic.py "*.critical"
可以创建多个绑定:

python receive_logs_topic.py "kern.*" "*.critical"
Producer产生一个log:"kern.critical" type:

python emit_log_topic.py "kern.critical" "A critical kernel error"

课后思考题:

  • Will "*" binding catch a message sent with an empty routing key?
  • Will "#.*" catch a message with a string ".." as a key? Will it catch a message with a single word key?
  • How different is "a.*.#" from "a.#"?

尊重原创,转载请注明出处 anzhsoft: http://blog.csdn.net/anzhsoft/article/details/19633079

参考资料:

1. http://www.rabbitmq.com/tutorials/tutorial-five-python.html

分享到:
评论

相关推荐

    基于RabbitMQ的消息路由分发实例

    基于RabbitMQ消息队列的路由分发实例源码,直接可以运行。

    rabbitmq 7种队列实现java版

    文章目录rabbitmq7种实现方式搭建maven项目引入依赖创建连接简单队列消息生产者消息消费者work queues 工作队列生产者消费者能者多劳(公平分发):消费能力强则消费更多消息Publish/Subscribe 发布订阅模式生产者...

    深入理解RabbitMQ消息队列的使用

    RabbitMQ单节点服务搭建以及集群的搭建、RabbitMQ的整体架构及各个组件的功能、RabbitMQ当中生产者以及消费者的具体实现、消费者如何做到消息的确认、RabbitMQ如何做到消息的公平分发、RabbitMQ当中fanout、direct、...

    C# RabbitMQ 实例代码

    C# 编写的RabbitMQ操作实例 功能包括: 1、循环调度 2、消息确认 3、消息持久化 4、公平分发 Exchange(路由器) direct(明确的路由规则:消费端... fanout (消息广播,将消息分发到exchange上绑定的所有队列上)

    Sringboot整合RabbitMQ(三):发布订阅(Publish/Subscribe)

    在本篇教程中,我们要做的跟之前完全不一样 —— 分发一个消息给多个消费者(consumers)。这种模式被称为” 发布 / 订阅”。 交换器(Exchanges) 在本教程的前面部分,我们发送和接收到队列中的消息,现在是时候在...

    java面试题_消息中间件--RabbitMQ(22题).pdf

    1、上千万条消息在mq中积压了⼏个⼩时还没解决: 2、rabbitmq设置过期时间,部分消息丢失: 3、RabbitMQ 上的⼀个 queue 中存放的 message 是否有数量限制? 4、分布式部署: ...22、rabbitmq队列与消费者的关系?

    RabbitMq

    消息队列解决了什么问题 异步处理 1.串行方式 2.并行方式 3.异步处理 应用结耦 4. 流量削峰 5. 日志处理,即时通讯 Java操作RabbitMq simple 简单队列 work queues 工作队列 公平分发 轮询分发 public/...

    基于RabbitMQ的消息分发应用框架,基于RabbitMQ官方驱动重度优化设计的应用框架

    基于RabbitMQ的消息分发应用框架,基于RabbitMQ官方驱动重度优化设计的应用框架。基本功能有生产消息层面 、消费消息层面

    rabbitmq安装

    RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。

    RabbitMQ基本概念和使用

    很容易使用RabbitMQ简化工作量,一个Consumer进行消息的正常处理,另一个Consumer复制对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange即可,剩下的消息分发工作由...

    queueman:Queueman 是一个适用于 RabbitMQ、Redis 队列的高性能分发中间件。支持延时队列、并发控制、失败自动重试

    Queueman 是一个适用于 RabbitMQ、Redis 队列的高性能分发中间件。支持延时队列、并发控制、失败自动重试。 简单的并发控制 简单配置就可以自动失败后重试 不用再写命令行代码就可以消费队列了 测试理论速度:单机 1...

    rabbitMQ.doc

    rabbitMq简单介绍还有下载,安装及配置,包括一些队列案例(Simble简单队列,.work queues 工作队列 公平分发 轮询分发,订阅模式 publish/subscribe,routing路由模式,Topic 主题模式,rabbitMq的消息确认机制)

    25道RabbitMQ面试题含答案(很全)

    消息在进入队列之前会通过交换器进行路由,使得消息能够按照特定的规则进行分发。此外,RabbitMQ支持构建集群,多个节点可以组成一个集群,并可以根据实际业务需求动态地扩展集群中的节点。它还提供了广泛的语言...

    word源码java-rabbitmq-tutorial-java:RabbitMQ官方教程的翻译和说明--Java版

    生产者和RabbitMQ服务器建立连接和通道,声明路由器,同时为消息设置路由键,这样,所有的消息就会以特定的路由键发给路由器,具体路由器会发送到哪个或哪几个队列,生产者在大部分场景中都不知道。(1个路由器,但...

    hutch:用于处理来自RabbitMQ的消息的系统

    Hutch认为:它使用主题交流进行消息分发,并对消费者和发布者的工作方式进行了一些假设。 使用Hutch,使用者存储在单独的文件中,并包括Hutch::Consumer模块。 然后由连接到RabbitMQ的命令行运行程序加载它们,...

    RabbitMQ_Project.zip

    rabbitmq的入门实例,有简单队列,轮询分发模式,公平分发模式,交换机,路由模式,topic模式

    基于C#的游戏服务器后台.zip

    消息队列与分发: 实现消息队列系统,用于缓冲、排序和分发来自客户端的请求。可以使用内置的ConcurrentQueue等线程安全数据结构,或者引入成熟的队列服务(如RabbitMQ、Azure Service Bus)。 数据存储: 配置...

Global site tag (gtag.js) - Google Analytics