RabbitMQ在Java端口的使用
1. 配置RabbitMQ相关信息
在application.yml中配置关于rabbitmq的信息
1 | spring: |
2. spring导入依赖
导入spring1
2
3
4
5
6
7
8
9
10
11
12
13
141.建议方式
//<!-- 移除手动指定的版本号 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.指定方式,容易出现Rabbtimq和amqp双方api不和问题
//Spring AMQP 包含 RabbitTemplate
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>3.1.0</version>
</dependency>
3. RabbitMQ生成,监听
3.1 RabbitMQ通过config类创建交换价和队列
将交换机和队列注入为bean自动创建1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class RabbitMQConfig {
// 声明交换机
public DirectExchange directExchange() {
// 创建一个持久化的直连交换机
return new DirectExchange("first.direct", true, false);
}
// 声明队列(red)
public Queue queue() {
// 创建一个持久化的队列(接受red KEY)
return new Queue("direct.queue", true);
}
// 声明队列(blue)
public Queue queue1() {
// 创建一个持久化的队列(接受blue KEY)
return new Queue("direct.queue2", true);
}
// 声明绑定关系
public Binding binding() {
// 将队列通过路由键绑定到交换机
return BindingBuilder.bind(queue())
.to(directExchange())
.with("red");
}
public Binding binding1() {
// 将队列通过路由键绑定到交换机
return BindingBuilder.bind(queue1())
.to(directExchange())
.with("blue");
}
// 如果需要更多的队列和交换机,可以定义更多的Bean
}
3.1.2 发送消息(Test类测试)
在生成队列时可以绑定队列的关键字key,在发送消息时,相应交换机的队列只会接收到相同关键字的队列1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class RabbitMqApplicationTests {
RabbitTemplate rabbitTemplate;
//直接交换机消息
public void sendDirectMessageWithRed() {
String exchange = "first.direct";
String message = "红色警告,克苏鲁苏醒";
// 发送到指定的交换机和路由键
rabbitTemplate.convertAndSend(exchange,"red",message);
rabbitTemplate.convertAndSend(exchange,"blue","蓝色预警,异兽复苏");
}
//蓝色预警
public void sendDirectMessageWithBlue() {
String exchange = "first.direct";
String message = "蓝色预警,异兽复苏";
// 发送到指定的交换机和路由键
rabbitTemplate.convertAndSend(exchange,"blue",message);
}
}
3.1.2 监听消息
1 | //direct交换机,不同key接受的消息不同 |
3.2 通过注解的方式创建并绑定,监听队列和交换机
也是通过spring注册bean自动生成
注解生成,绑定多个关键字1
2
3
4
5
6
7
8
9//注解的方式生成交换机和队列并绑定
public void DirectListenr3(String msg){
System.out.println("direct.queue3消费者接受到的消息是:{"+msg+"}");
}
4. 生产者确认
4.1 配置文件
开启生产者确认1
2
3
4
5
6
7
8
9
10
11
12
13
14template:
retry:
enabled: true #开启重试机制
initial-interval: 1000ms #设置超时时间
multiplier: 1 #超时重试间隔倍数
max-attempts: 3 #重试次数限制
publisher-confirm-type: correlated
#三种机制
1. none(关闭)
2. correlated(MQ异步回调方式返回回执消息)
3. simple(同步阻塞等待mq回执消息)
publisher-returns: true #开启生产者确认机制
4.2 config类
回调信息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class CommonConfig implements ApplicationContextAware {
private static final Logger log = LoggerFactory.getLogger(CommonConfig.class);
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// ... existing code ...
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
public void returnedMessage(ReturnedMessage returnedMessage) {
//配置回调
log.info("收到消息的returnback,exchange:{} , key:{} , msg:{} , code :{} , text:{}",
returnedMessage.getExchange(),returnedMessage.getRoutingKey(),
returnedMessage.getMessage(),returnedMessage.getReplyCode(),
returnedMessage.getReplyText()
);
}
});
}
}
4.3 生产者发送类
若出现发送时的问题会日志显示,实际中可用作处理业务1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void testConfirmCallback() {
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
cd.getFuture().whenCompleteAsync((confirm, exception) -> {
if (exception != null) {
log.error("消息回调失败", exception);
} else {
log.debug("收到confirm callback回执");
if (confirm.isAck()) {
log.debug("消息发送成功,收到ack");
} else {
log.error("消息发送失败,收到nack, 原因: {}", confirm.getReason());
}
}
});
rabbitTemplate.convertAndSend("first.directad","red2","hello",cd);
}
5. 消费者确认
5.1 配置文件
在消费者出现错误时会使用
若消费者类中出现报错等异常情况会重新发送消息1
2
3
4
5
6
7
8
9
10listener:
simple:
prefetch: 1
acknowledge-mode: auto #none(关闭ack) , manual(手动ack) , auto(自动ack)
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms #初始等待时间(1s)
multiplier: 1 #等待时间倍数
max-attempts: 3 #最大重试次数
stateless: true #true无状态,false有状态,业务中包含事务改为false
3. RabbitMQ使用
1 |
3. RabbitMQ使用
1 |
3. RabbitMQ使用
1 |
3. RabbitMQ使用
1 |
3. RabbitMQ使用
1 |
3. RabbitMQ使用
1 |