RabbitMQ快速入门

RabbitMQ是对高级消息队列协议(Advanced Message Queueing Protocol, AMQP)的实现,本文主要介绍一些AMQP协议相关的基本概念,在对AMQP有一定的认知之后,能够帮助我们更好的了解RabbitMQ消息中间件。

1.AMQP

AMQP[1],即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品、不同开发语言等条件的限制。

换句话说,AMQP可以使符合规范的客户端应用程序与符合规范的消息中间件代理进行通信。

引用:
[1].
百度百科“AMQP”词条:https://baike.baidu.com/item/AMQP/8354716?fr=aladdin

2.消息代理(message broker)

AMQP中,有一个重要的名词——消息代理。消息代理会接收来自生产者(publishers/producers)生产的消息,并将它们路由(route,可以理解成按指定规则转发)给相应的消费者(consumers)手中。

并且,由于AMQP是一种网络协议,所以消息代理、生产者、消费者可以分布在不同的机器上。

3.AMQP术语

除了上面提及的几个术语——消息代理、生产者、消费者、路由,AMQP模型中还有如下几个常用术语:交换器(exchange)、队列(queue)、绑定(binding)、通道(channel)等等。

4.交换器(exchange)

生产者生产消息至交换器之后,由交换器负责将消息转发出去。根据交换器类型的不同,交换器将消息转发出去的转发规则也会有所不同。目前,AMQP提供了四种不同的交换器类型:direct, fanout, topic, header。

除此之外,交换器还有许多属性可供设置。其中比较重要的属性有:name, durability, auto-delete, arguments。

(1)默认交换器

不指定交换器类型时,交换器默认是direct类型,其name属性为空。并且默认交换器有一个重要的特性:每个队列都会使用它的队列名字作为路由关键字(routing key)去自动地绑定到默认交换器上。

换言之,对于默认的交换器,无需用队列对其进行绑定操作,因为所有的队列都会自动的与之绑定。

但是,默认交换器与所有队列都进行绑定并不意味着交换器会将消息分发给所有的队列。交换器最终会将消息转发给哪个队列在生产者生产消息的时候就已经确定了——生产者生产消息时会指定消息路由关键字,这里的路由关键字即队列名。也就是说,消息会转发给与该路由关键字同名的队列中去。

(2)direct类型

该类型的交换器在分发消息到指定队列时要求:生产者生产某条消息时指定的路由关键字X与队列绑定到交换器上时指定的路由关键字Y完全一致,即X = Y。

而且,多个队列可以使用相同的路由关键字与交换器绑定。比如,生产者不断的生成路由关键字为X的消息,你可以指定多个队列都使用路由关键字Y与该交换器绑定(X=Y)。这样一来,消息就会被分发至多个队列中。但是,需要注意的是,消息负载均衡并不在于队列间,而是在于消费者间。

direct类型的更多内容,将在后面的文章中介绍。

(3)fanout类型

该类型的交换器会将消息转发给所有与之绑定的队列上。比如,有N个队列与fanout交换器绑定,当产生一条消息时,交换器会将该消息的N个副本分别发给每个队列。类似于广播机制。

direct类型的更多内容,将在后面的文章中介绍。

(4)topic类型

该类型的交换器会视消息路由关键字和绑定路由关键字之间的匹配情况,进行消息的路由转发。比如,消息路由关键字为x.y.z的消息会转发给绑定路由关键字为*.y.*的队列,也会转发给绑定路由关键字为x.#的队列,但却不会转发给绑定路由关键字为*.a.*的队列。

具体的路由关键字的匹配规则将在后面的文章中介绍。

(5)headers类型

该类型的交换器与前面介绍的稍有不同,它不再是基于路由关键字(routing key)进行路由,而是基于多个属性进行路由的,这些属性比路由关键字更容易表示为消息的头。也就是说,用于路由的属性是取自于消息header属性的,当消息header的值与队列绑定时指定的值相同时,消息就会路由至相应的队列中。

5.队列(Queues)

AMQP模型中的队列用来存储消费者待消费的消息。和交换器一样,队列也有许多可供设置的属性,并且除了与交换器一些相同的属性外,还有一些额外的属性。主要的属性有:name, durable, exclusive, auto-delete, arguments等等。

需要注意的是,队列在使用之前必须先声明。声明之前,如果该队列不存在,那么声明之后就会创建一个队列;如果该队列已经存在了,并且声明的队列与存在的队列属性相同,则不产生任何影响;如果该队列已经存在了,但是声明的队列与存在的队列属性不同,则会抛出一个错误码为406(PRECONDITION_FAILED)的异常。

(1)name属性

队列名支持最多255字节的UTF-8字符。应用程序在声明队列的时候可以自己指定队列名,或者当应用程序指定name属性为空时,代理(broker)会自动地为其生成一个唯一的队列名。

需要注意的是,以”amq.”开头的队列名是由AMQP内部使用的命名前缀,请开发者不要使用,否则将抛出403异常。

(2)durability属性

durability属性对应两种情况,分别是durable(持久的)和transient(短暂的)。durable类型的队列会持久化至硬盘上,所以当代理(broker)重启之后,它依然存在。相反地,当代理重启之后,transient类型的队列就消失了。

需要注意的是,队列的持久化是相对队列而言,对存储在持久化队列中的消息来说,当代理重启之后:队列还存在、消息则不存在。

所以,当broker重启之后,如果想让消息仍然存在,这就是消息持久化机制干的事了,后面再说消息属性的相关内容。

6.绑定(binding)

队列获取来自交换器的消息的前提是该队列必须先与交换器进行绑定。绑定之后,交换器才能将消息按照特定的规则,路由至相应的队列中去。随后,消费者才能从队列中消费消息。

如果生产者生产的某条消息,没有与之匹配的任何一个队列可供路由(比如,没有任何队列与交换器绑定)。那么,根据该条消息的属性,该消息要么丢弃,要么返回至生产者。

7.连接(connection)

我们已经知道,AMQP是一个应用层协议,并且是基于TCP可靠传输的应用层协议。除此之外,AMQP也提供了加密传输的机制(使用TSL或SSL),让消息传递更加安全。

当需要断开AMQP代理时,正确的做法是关闭AMQP连接,而不是粗鲁的直接断开其底层的TCP连接。

8.通道(channel)

有的应用程序需要与AMQP broker建立多个连接。在AMQP模型中,我们不需要通过建立太多的TCP连接来实现。假如针对每一个AMQP连接都建立一个TCP连接的话,会占用大量的系统资源。对此,AMQP提供了通道(channel)机制。即,共享一个TCP连接,可创建多个通道。

在多线程/进程的应用程序中正确做法是,对于每一个线程/进程,应分别建立一个通道,而不是多个线程/进程之间去共享一个通道。

9.虚拟机(virtual hosts)

AMQP使用了虚拟机的概念,在一个broker上面划分出多个隔离的环境(各环境下的用户、交换器以及队列等互不影响)。这样一来,AMQP客户端们在进行连接的时候,需要协商指定同一个vhost才能进行正常的往来业务。

10.消息确认(message acknowledgements)

试想一下这个问题:如果消费者应用程序在处理某个消息的时候突然奔溃了,那么这条消息该何去何从?再进一步的说,AMQP消息代理要在什么时候将某条消息从队列中移除?对此,AMQP给出了两种处理办法:一是,当消息代理(broker)将一条消息发送给消费者应用程序之后就将其从队列中移除;二是,当消费者应用程序返还一条确认信息之后(类似于TCP三次握手中的ack确认)就将其从队列中移除。

第一种处理方法是自动确认的,称为automatic acknowledgement;第二种处理方法则需要由消费者进行确认操作,称为explicit acknowledgement。针对第二种处理方法,消费者在何时返回ack也是比较灵活的。比如,消费者可以在接收到消息的第一时间就返回一个ack,或者在将消息持久化到硬盘之后返回ack,又或者在消息完全处理完之后返回ack。

如果一个接收到消息的消费者在没有返回ack之前就挂掉了,那么,AMQP消息代理将会将这条消息发送给其他能匹配上的消费者。但是,若没有任何消费者能够匹配上这条消息,AMQP代理将会一直等待,直到有能匹配上的消费者出现,再将该消息投递给它。

11.拒绝消息(rejecting messages)

消费者应用程序在处理消息失败时,应用程序可以通知broker:消息处理失败,拒绝消息。当拒绝消息时,消费者应用程序可以要求broker丢弃或重新发送该消息。

12预取消息(prefetching messages)

多个消费者从一个队列中消费消息时,可以指定每个消费者在返回下一个ack之前,每次发送给他们的消息数量,即预取消息的数量。这可以当做一个简单的负载均衡技术来使用,

也可以在批量生产消息的情景下提高吞吐量。

13.消息属性和有效负载(message attributes and payload)

AMQP模型中的消息实体各种属性,比如:

  • content type

  • content encoding

  • routing key

  • delivery mode(persistent or not)

  • message priority

  • message publish timestamp

  • expiration period

  • publisher application id等。

消息的属性是在消息被生产时设置的。

AMQP消息除了属性之外还有一个有效负载(可以理解为消息体,即它携带的数据),AMQP视这些负载数据为一个不透明的字节数组。并且,AMQP代理不会去窥探和改变这些负载的数据。对于这些负载的数据,你可以使用例如JSON这样的序列化格式来存储。

如果将消息属性设置为持久化,AMQP代理将会将这些持久化的消息写入磁盘,这可以保证当服务重启之后,消息不会丢失。

仍需要啰嗦的一点是:如果仅仅将一个消息生产到一个持久化的交换器,或者将这条消息路由到一个持久化的队列中去,并不能使这个消息本身变成持久化消息。消息的持久化与否主要取决于消息本身的属性设置。



上一篇: RabbitMQ基础知识 下一篇: Linux常用命令