https://kangshitao.github.io/2021/10/26/rabbitmq/

基础知识:

image.png
2.消息应答:**消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。 **

1.安装:

Ubuntu20.04安装RabbitMQ,并配置远程调用,详细教程_ubuntu20.04 rabbitmq安装使用-CSDN博客

2.hello项目:

1
2
3
4
5
6
7
1.生产者创建连接工厂,连接工厂创建连接
2.发送消息(创建Channel)
3.消费者创建连接工厂,连接工厂创建连接(需要提供服务器地址,端口,用户,密码)
4.接收信息(创建Channel,
5.消费者内部有消费成功的回调函数和失败的回调函数,消费者会监听生产者生成消息
6.当只有一个生产者,多个消费者的时候,消费者默认按照轮询的方式消费信息
7.

消息应答:

image.png
1.默认为自动应答,也就是信息发出去后队列就删除这条信息。我们可以设置为手动应答,当我们发送成功后,但是某一个消费者挂掉了,这个时候发送丢失的消息就会重写入队。

持久化:

1.保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。
image.png
image.png

发布确认:

1
2
//开启发布确认
channel.confirmSelect();
1
2
//服务端返回 false 或超时时间内未返回,生产者可以消息重发,等待发布的确认
boolean flag = channel.waitForConfirms();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.rabbitmq.client.*;

public class AsyncPublisher {

private static final String EXCHANGE_NAME = "async_exchange";
private static final String ROUTING_KEY = "async_routing_key";
private static final String QUEUE_NAME = "async_queue";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

// 创建连接
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

// 声明交换机和队列
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

// 设置发布者确认模式
channel.confirmSelect();

// 异步确认发布的回调函数
ConfirmCallback confirmCallback = (deliveryTag, multiple) -> {
System.out.println("消息发布成功,deliveryTag: " + deliveryTag);
};

// 异步确认发布失败的回调函数
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.err.println("消息发布失败,deliveryTag: " + deliveryTag);
};

// 设置异步确认发布的回调函数
channel.addConfirmListener(confirmCallback, nackCallback);

// 发布消息
String message = "Hello, RabbitMQ!";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

// 等待回调确认
channel.waitForConfirmsOrDie();

System.out.println("消息发布完成");
}
}
}

交换机: