1. 配置RabbitMQ相关信息

在application.yml中配置关于rabbitmq的信息

1
2
3
4
5
6
7
spring:
rabbitmq:
host: 172.16.207.227 //根据实际的地址(本地部署可以为localhost),这里指虚拟机Ubuntu
username: guest //默认用户
password: guest //默认密码
port: 5673 //RabbitMQ官方指定的端口号
virtual-host: / //默认的虚拟端口,在admin中更改

2. spring导入依赖

导入spring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1.建议方式
//<!-- 移除手动指定的版本号 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.指定方式,容易出现Rabbtimq和amqp双方api不和问题
//Spring AMQP 包含 RabbitTemplate
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>3.1.0</version>
</dependency>

3. RabbitMQ生成,监听

3.1 RabbitMQ通过config类创建交换价和队列

将交换机和队列注入为bean自动创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

@Configuration
public class RabbitMQConfig {
// 声明交换机
@Bean
public DirectExchange directExchange() {
// 创建一个持久化的直连交换机
return new DirectExchange("first.direct", true, false);
}

// 声明队列(red)
@Bean
public Queue queue() {
// 创建一个持久化的队列(接受red KEY)
return new Queue("direct.queue", true);
}

// 声明队列(blue)
@Bean
public Queue queue1() {
// 创建一个持久化的队列(接受blue KEY)
return new Queue("direct.queue2", true);
}

// 声明绑定关系
@Bean
public Binding binding() {
// 将队列通过路由键绑定到交换机
return BindingBuilder.bind(queue())
.to(directExchange())
.with("red");
}
@Bean
public Binding binding1() {
// 将队列通过路由键绑定到交换机
return BindingBuilder.bind(queue1())
.to(directExchange())
.with("blue");
}

// 如果需要更多的队列和交换机,可以定义更多的Bean
}

3.1.2 发送消息(Test类测试)

在生成队列时可以绑定队列的关键字key,在发送消息时,相应交换机的队列只会接收到相同关键字的队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

@SpringBootTest
class RabbitMqApplicationTests {

@Autowired
RabbitTemplate rabbitTemplate;

//直接交换机消息
@Test
public void sendDirectMessageWithRed() {
String exchange = "first.direct";
String message = "红色警告,克苏鲁苏醒";
// 发送到指定的交换机和路由键
rabbitTemplate.convertAndSend(exchange,"red",message);
rabbitTemplate.convertAndSend(exchange,"blue","蓝色预警,异兽复苏");
}
//蓝色预警
@Test
public void sendDirectMessageWithBlue() {
String exchange = "first.direct";
String message = "蓝色预警,异兽复苏";
// 发送到指定的交换机和路由键
rabbitTemplate.convertAndSend(exchange,"blue",message);
}

}

3.1.2 监听消息

1
2
3
4
5
6
7
8
9
//direct交换机,不同key接受的消息不同
@RabbitListener(queues = "direct.queue")
public void DirectListenr(String msg){
System.out.println("direct.queue消费者接受到的消息是:{"+msg+"}");
}
@RabbitListener(queues = "direct.queue2")
public void DirectListenr2(String msg){
System.out.println("direct.queue2消费者接受到的消息是:{"+msg+"}");
}

3.2 通过注解的方式创建并绑定,监听队列和交换机

也是通过spring注册bean自动生成
注解生成,绑定多个关键字

1
2
3
4
5
6
7
8
9
//注解的方式生成交换机和队列并绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3"), //队列名称
exchange = @Exchange(name = "first.direct" , type = ExchangeTypes.DIRECT), //交换机名称和类型
key = {"red" , "blue"} //关键字
))
public void DirectListenr3(String msg){
System.out.println("direct.queue3消费者接受到的消息是:{"+msg+"}");
}

4. 生产者确认

4.1 配置文件

开启生产者确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template:
retry:
enabled: true #开启重试机制
initial-interval: 1000ms #设置超时时间
multiplier: 1 #超时重试间隔倍数
max-attempts: 3 #重试次数限制

publisher-confirm-type: correlated
#三种机制
1. none(关闭)
2. correlated(MQ异步回调方式返回回执消息)
3. simple(同步阻塞等待mq回执消息)

publisher-returns: true #开启生产者确认机制

4.2 config类

回调信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class CommonConfig implements ApplicationContextAware {
private static final Logger log = LoggerFactory.getLogger(CommonConfig.class);

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// ... existing code ...
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
//配置回调
log.info("收到消息的returnback,exchange:{} , key:{} , msg:{} , code :{} , text:{}",
returnedMessage.getExchange(),returnedMessage.getRoutingKey(),
returnedMessage.getMessage(),returnedMessage.getReplyCode(),
returnedMessage.getReplyText()
);
}
});

}
}

4.3 生产者发送类

若出现发送时的问题会日志显示,实际中可用作处理业务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
void testConfirmCallback() {
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());

cd.getFuture().whenCompleteAsync((confirm, exception) -> {
if (exception != null) {
log.error("消息回调失败", exception);
} else {
log.debug("收到confirm callback回执");
if (confirm.isAck()) {
log.debug("消息发送成功,收到ack");
} else {
log.error("消息发送失败,收到nack, 原因: {}", confirm.getReason());
}
}
});

rabbitTemplate.convertAndSend("first.directad","red2","hello",cd);
}

5. 消费者确认

5.1 配置文件

在消费者出现错误时会使用
若消费者类中出现报错等异常情况会重新发送消息

1
2
3
4
5
6
7
8
9
10
listener:
simple:
prefetch: 1
acknowledge-mode: auto #none(关闭ack) , manual(手动ack) , auto(自动ack)
retry:
enabled: true #开启消费者失败重试
initial-interval: 1000ms #初始等待时间(1s)
multiplier: 1 #等待时间倍数
max-attempts: 3 #最大重试次数
stateless: true #true无状态,false有状态,业务中包含事务改为false

3. RabbitMQ使用

1

3. RabbitMQ使用

1

3. RabbitMQ使用

1

3. RabbitMQ使用

1

3. RabbitMQ使用

1

3. RabbitMQ使用

1