https://kangshitao.github.io/2021/10/26/rabbitmq/
基础知识:

2.消息应答:**消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。 **
1.安装:
Ubuntu20.04安装RabbitMQ,并配置远程调用,详细教程_ubuntu20.04 rabbitmq安装使用-CSDN博客
2.hello项目:
1 2 3 4 5 6 7
| 1.生产者创建连接工厂,连接工厂创建连接 2.发送消息(创建Channel) 3.消费者创建连接工厂,连接工厂创建连接(需要提供服务器地址,端口,用户,密码) 4.接收信息(创建Channel, 5.消费者内部有消费成功的回调函数和失败的回调函数,消费者会监听生产者生成消息 6.当只有一个生产者,多个消费者的时候,消费者默认按照轮询的方式消费信息 7.
|
消息应答:

1.默认为自动应答,也就是信息发出去后队列就删除这条信息。我们可以设置为手动应答,当我们发送成功后,但是某一个消费者挂掉了,这个时候发送丢失的消息就会重写入队。
持久化:
1.保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。


发布确认:
1 2
| //开启发布确认 channel.confirmSelect();
|
1 2
| //服务端返回 false 或超时时间内未返回,生产者可以消息重发,等待发布的确认 boolean flag = channel.waitForConfirms();
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import com.rabbitmq.client.*;
public class AsyncPublisher {
private static final String EXCHANGE_NAME = "async_exchange"; private static final String ROUTING_KEY = "async_routing_key"; private static final String QUEUE_NAME = "async_queue";
public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
// 创建连接 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
// 声明交换机和队列 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 设置发布者确认模式 channel.confirmSelect();
// 异步确认发布的回调函数 ConfirmCallback confirmCallback = (deliveryTag, multiple) -> { System.out.println("消息发布成功,deliveryTag: " + deliveryTag); };
// 异步确认发布失败的回调函数 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.err.println("消息发布失败,deliveryTag: " + deliveryTag); };
// 设置异步确认发布的回调函数 channel.addConfirmListener(confirmCallback, nackCallback);
// 发布消息 String message = "Hello, RabbitMQ!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 等待回调确认 channel.waitForConfirmsOrDie();
System.out.println("消息发布完成"); } } }
|
交换机: