目录
资料下载地址:day05MQ高级
消息丢失的三大类:
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
需要注意的是,确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。
在publisher模块中配置如下内容
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
在生产者模块中配置全局ReturnCallback(一个RabbitTemplate只能配置一个ReturnCallback)
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate template = applicationContext.getBean(RabbitTemplate.class);
template.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode,replyText,exchange,routingKey,message.toString());
}));
}
}
进行测试
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String routingKey = "simple";
String message = "hello, spring amqp!";
//准备消息id
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(
result->{
if (result.isAck()){
log.debug("消息发送成功,ID:{}",correlationData.getId());
}else {
log.error("消息发送失败,ID:{},原因{}",correlationData.getId(),result.getReason());
}
},
ex->{
log.error("消息发送异常,ID:{},原因:{}",correlationData.getId(),ex.getMessage());
}
);
rabbitTemplate.convertAndSend("amq.topic", routingKey, message,correlationData);
}
}
运行观察控制台
测试一种路由失败的情况,这种情况可以正常发送到交换机,但是不能发送到Queue
MQ默认是内存存储,当服务重启后,数据就会丢失。因此我们需要对交换机与队列进行持久化操作。在消费者模块添加如下代码
@Configuration
public class CommonConfig {
@Bean
public DirectExchange directExchange(){
/**
* name:交换机名称
* durable:是否持久化
* autoDelete:当没有队列绑定时是否删除
*/
return new DirectExchange("direct.exchange",true,false);
}
@Bean
public Queue simpleQueue(){
/**
* 使用Builder创建持久化队列
* 使用 new Queue("名称")创建也可以,默认就是持久化的
*/
return QueueBuilder.durable("simple.queue").build();
}
}
启动消费者,就可以看到交换机与队列被持久到磁盘中,但需要注意的时,消息并没有持久化,当重启服务器消息还是会丢失。之前我们发送的消息是String类型,现在,我们使用AMQP的Message对消息进行持久化。
@Test
public void testDurableMessage() throws Exception {
Message msg = MessageBuilder.withBody("hello spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend("simple.queue",msg);
}
RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
manual模式对代码有一定入侵,需要添加发送ack的代码。因此不推荐使用
auto模式是通过Spring的AOP机制,来对消息进行自动确认。推荐使用
none模式不对消息进行确认,不使用
在消费者模块的配置文件中配置如下内容
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
进行测试,在监听器处添加错误代码
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
System.out.println(1/0);
}
}
进行debug观察Rabbit控制台
对断点放行,会发现控制台抛出错误后立即再进入断点,那么就可以确定,MQ会再次投递失败的消息。取消断点放行,会发现控制台无休止进行打印错误,这种处理方式并不友好,因此我们可以自定义失败重试机制。
当消费者消费消息抛出异常后,会将消息投递给MQ。而MQ又会立即投递给消费者。这样循环往复会导致MQ的消息处理飙升,带来不必要的压力。因此我们可以采用Spring的重试机制(在本地重试,不返回ack也不返回nack),来避免这种情况。
消费者模块的配置文件添加如下内容
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000 #初始的失败等待时长为1s
multiplier: 2 #下次失败的等待时长倍数,下次灯带时长 = multiplier * last-interval
max-attempts: 3 #最大重试次数
stateless: true # true无状态;false有状态,如果业务中包含事务,这里改为false
接下来进行测试
首先是重试时间分别为1,2对应着配置中的1s与1s*2,如果还有下次重试次数那么重试时间就是1s*2*2。其次是在RabbitMQ中找不到这条错误的消息了。具体原因如下
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
添加一个新的Config
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.exchange");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue",true);
}
@Bean
public Binding errorBinging(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");
}
}
重启发送一条消息测试
观察Rabbit的控制台
当一个队列中的消息满足下列情况之一时,可以成为死信 (dead letter):
如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)
与RepublishRecoverer的区别在于,该种方式是通过MQ进行转发,而RepublishRecoverer是通过消费者进行转发。如果只是保存失败的消息,那么推荐使用RepublishRecoverer。
TTL(time to live)超时时间分为两种情况:
当消息到达存活时间后还没有被消费会被自动清除。如果同时设置了消息过期时间和队列过期时间,以时间短的为准,队列过期会将所有消息移除,如果一个已经过期的消息不在队列顶端时并不会立即移除,一旦它到了队列顶端则会进行判断是否移除。
我们可以通过TTL来实现一个延时队列,对消息设置过期时间存放在ttl.queue,但是没有消费者监听该队列,等到过期之后,放入死信队列,而消费者监听死信队列,对过期消息进行消费,从而实现延时队列。具体流程如下
接下来实现延时队列
编写ttl部分
@Slf4j
@Configuration
public class TTLMessageConfig {
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue")
.ttl(10000)//超时时间
.deadLetterExchange("dl.exchange")//指定死信队列
.deadLetterRoutingKey("dl")//死信队列的路由key
.build();
}
@Bean
public DirectExchange ttlExchange(){
return new DirectExchange("ttl.exchange");
}
@Bean
public Binding simpleBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
}
}
编写消费者方的监听
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dl.queue",durable = "true"),
exchange = @Exchange(name = "dl.exchange"),
key = "dl"
))
public void listenDlQueue(String msg){
log.info("消费者接收到了延时消息:{}",msg);
}
编写测试方法
@Test
public void testTTLMessage() throws Exception {
Message msg = MessageBuilder.withBody("hello TTL".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend("ttl.exchange","ttl",msg);
log.info("消息成功发送!");
}
至此实现了延时处理消息。
但是通过死信队列来实现延迟队列的做法有点麻烦,我们可以使用RabbitMQ的原生插件DelayExchange来实现这个功能。
基于Linux下载插件文档:Scheduling Messages with RabbitMQ | RabbitMQ - Blog
而我们是基于Docker下载插件,下载地址为:Community Plugins — RabbitMQ
需要注意RabbitMQ与插件的版本对应关系即可。下载好之后,将其拖入mq的数据卷中。接下来安装插件,如要进入docker容器中
#进入容器
docker exec -it mq bash
#开启插件功能
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
这样就安装完毕了。
DelayExchange插件的原理是对官方原生的Exchange做了功能的升级:
DelayExchange的声明是在RabbitMQ的控制台中
其次,消息的延迟时间也需要在Exchange中指定
控制台的使用方法肯定不符合开发中使用,因此我们接下来使用代码使用该插件
声明延迟队列交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "delay.queue",durable = "true"),
exchange = @Exchange(name = "delay.exchange",delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String msg){
log.info("接收到延迟消息:{}",msg);
}
编写测试类
@Test
public void testDelayMessage() throws Exception {
Message msg = MessageBuilder.withBody("hello delay".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.setHeader("x-delay",5000)
.build();
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.exchange","delay",msg,correlationData);
log.info("消息成功发送!");
}
因此,我们需要修改发送方的判断逻辑。判断是否存在receiveDelay值
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate template = applicationContext.getBean(RabbitTemplate.class);
template.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) -> {
//如果是延迟消息,则直接返回
if (message.getMessageProperties().getReceivedDelay()>0) {
return;
}
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode,replyText,exchange,routingKey,message.toString());
}));
}
}
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
解决消息堆积有三种种思路:
前两种情况是解决消息堆积问题,后一种是环节消息堆积问题。开启线程池也需要看情况,对于处理时间短的消息会因为频繁的CPU上下文切换导致CPU占用内存,因此开启线程池适用于处理消息时间长的情况。
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。惰性队列的特征如下
之所以引用惰性队列,就是为了提高消息的堆积能力,传统的RabbitMQ的消息默认是存储在内存当中,当并发量高的时候,容易造成消息堆积,当占用内存百分之40时,MQ会暂时停止生产者的消息投递,将一部分消息保存在磁盘中,从到导致暂时的不可用状态,MQ的性能也就忽高忽低。而惰性队列是直接保存在磁盘当中,保证了MQ的稳定性,但损耗了性能。
使用AMQP实现惰性队列的声明
//基于注解
@RabbitListener(queuesToDeclare = @Queue(name = "lazy.queue"
,durable = "true",
arguments = @Argument(name = "x-queue-mode",value = "lazy"))
)
public void listenLazyQueue(String msg){
log.info("接收到延迟消息:{}",msg);
}
//基于Bean
@Configuration
public class LazyConfig {
@Bean
public Queue LazyQueue(){
return QueueBuilder.durable("lazy.queue").lazy().build();
}
}
同其他中间件解决高可用的方法一样,那就是搭建集群。
RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:
镜像集群虽然支持主从,但主从同步并不是强一致的,在同步期间可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。
普通集群,或者叫标准集群 (classic cluster),具备下列特征:
现在开始搭建集群
集群模式中的每个RabbitMQ节点使用Cookie来确定它们是否被允许相互通信。
要使两个节点能够通信,它们必须具有相同的共享秘密,称为Erlang cookie。Cookie只是一串最多255个字符的字母数字字符。
每个集群节点必须具有相同的Cookie。实例之间也需要它来相互通信。
从一个启动的MQ实例获取Cookie
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
获取内容为:XSLBYZHRKDOLAMZYTNML
在/tmp目录新建一个配置文件 rabbitmq.conf:
cd /tmp
# 创建文件
touch rabbitmq.conf
文件内容如下:
loopback_users.guest = false #禁用guest用户访问
listeners.tcp.default = 5672 #访问端口
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
cluster_formation.classic_config.nodes.1 = rabbit@mq1 #节点名称
cluster_formation.classic_config.nodes.2 = rabbit@mq2
cluster_formation.classic_config.nodes.3 = rabbit@mq3
接下来再创建一个配置文件用来存放Cookie信息
# 创建文件
touch .erlang.cookie
# 写入cookie
echo "XSLBYZHRKDOLAMZYTNML" > .erlang.cookie
# 修改cookie文件的权限为只读,不允许其他人修改
chmod 600 .erlang.cookie
mkdir mq1 mq2 mq3
# 将配置文件拷贝到其他文件夹
cp rabbitmq.conf mq1
cp rabbitmq.conf mq2
cp rabbitmq.conf mq3
cp .erlang.cookie mq1
cp .erlang.cookie mq2
cp .erlang.cookie mq3
准备一个docker网络
docker network create mq-net
启动容器
# 启动第一个容器
docker run -d --net mq-net \
-v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq1 \
--hostname mq1 \
-p 8071:5672 \
-p 8081:15672 \
rabbitmq:tag
# 启动第二个容器
docker run -d --net mq-net \
-v ${PWD}/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq2 \
--hostname mq2 \
-p 8072:5672 \
-p 8082:15672 \
rabbitmq:3.8-management
# 启动第三个容器
docker run -d --net mq-net \
-v ${PWD}/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
-v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq3 \
--hostname mq3 \
-p 8073:5672 \
-p 8083:15672 \
rabbitmq:tag
访问8081端口
在8081中创建一个队列,在8082与8083中查看是否可以查看该队列信息
可以正常看到队列信息。
镜像集群:本质是主从模式,具备下面的特征
看图解Rabbit的镜像集群类似于ES的数据分片。
镜像集群有三个模式
ha-mode |
ha-params |
效果 |
准确模式 exactly |
队列的副本量 count |
集群中队列副本(主服务器和镜像服务器之和)的数量。count如果为1意味着单个副本:即队列主节点。count值为2表示2个副本:1个队列主和1个队列镜像。换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。 |
all |
(none) |
队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。推荐使用exactly,设置副本数为(N / 2 +1)。 |
nodes |
node names |
指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。 |
配置模式的命令
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
rabbitmqctl set_policy
:固定写法ha-two
:策略名称,自定义^two\.
":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.
开头的队列名称{
"ha-mode
":
"exactly
",
"ha-params
":2,
"ha-sync-mode
":
"automatic
"}
': 策略内容
ha-mode
":
"exactly
":策略模式,此处是exactly模式,指定副本数量ha-params
":2
:策略参数,这里是2,就是副本数量为2,1主1镜像ha-sync-mode
":
"automatic
":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
ha-all
:策略名称,自定义^all\.
":匹配所有以all.
开头的队列名{
"ha-mode
":
"all
"}
':策略内容
"ha-mode":"all"
:策略模式,此处是all模式,即所有节点都会称为镜像节点rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
rabbitmqctl set_policy
:固定写法ha-nodes
:策略名称,自定义^nodes\.
":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.
开头的队列名称{
"ha-mode
":
"nodes
",
"ha-params
":[
"rabbit@nodeA
",
"rabbit@nodeB
"]}
': 策略内容
ha-mode
":
"nodes
":策略模式,此处是nodes模式ha-params
":[
"rabbit@mq1
",
"rabbit@mq2
"]
:策略参数,这里指定副本所在节点名称进入mq1容器
接下来宕机mq1观察队列变化
随后再次启动mq1,two.queue队列也和mq1没有关系了。
仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:
修改配置文件,配置节点信息
spring:
rabbitmq:
addresses: 192.168.116.131:8071,192.168.116.131:8072,192.168.116.131:8073
username: admin
password: admin
virtual-host: /
创建队列
@Configuration
public class QuorumConfig {
@Bean
public Queue quorumQueue(){
return QueueBuilder
.durable("quorum.queue2")
.quorum()
.build();
}
}
启动消费者就可以看到已经创建出quorum.queue2队列了
更多【rabbitmq-RabbitMQ的高级特性】相关视频教程:www.yxfzedu.com