RabbitMQ 是一个开源的消息代理中间件,基于 AMQP 协议,用于实现应用程序之间的异步消息传递和解耦。
依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yaml配置
server:
port: 8081
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 35.229.236.44
port: 10309
username: itcast
password: 123321
virtual-host: /
listener:
simple:
prefetch: 1
prefetch: 1的意思是消费者在处理完一个消息后可以立刻再得到一个消息
直接发送消息
@Test
public void testSendMsg2Queue() {
String queueName = "simple.queue";//队列名
String msg = "hello spring amqp";
rabbitTemplate.convertAndSend(queueName, msg);
}
直接接收消息
@RabbitListener(queues = "simple.queue")//监听队列名
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到simple.queue的消息是:["+ msg +"]" + LocalTime.now());
Thread.sleep(20);
}
Fanout交换机
Fanout Exchange 是一种 RabbitMQ 的交换机类型,它将接收到的消息广播到所有绑定到它的队列,不考虑路由键,从而实现消息的多路复用。
创建并绑定交换机和队列
@Configuration
public class FanoutConfig {
//itcast.fanout
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcastFanoutExchange");
}
//fanout.queue1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanoutQueue1");
}
//绑定队列1到交换机
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
//fanout.queue2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanoutQueue2");
}
//绑定队列2到交换机
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
发消息
@Test
public void testFanout() {
//交换机名称
String exchangeName = "itcastFanoutExchange";
//消息
String msg = "hello fanoutExchange";
//发送消息
rabbitTemplate.convertAndSend(exchangeName, "", msg);
}
接收消息
仍然是监听队列, 其他都一样
@RabbitListener(queues = "fanoutQueue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到fanoutQueue1的消息是:["+ msg +"]" + LocalTime.now());
Thread.sleep(20);
}
Direct
Direct 交换机需要指定routekey和bindingkey, 路由只会把消息推送到bindingkey和当前路由routingkey一致的消息队列(bindingkey可设置多个)
监听方法
//direct交换机所对应的queue1
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到fanoutQueue1的消息是:["+ msg +"]" + LocalTime.now());
Thread.sleep(20);
}
消息发送方法
跟fanout其实是一样的, 只要指定把消息发送到directadmin和routekey就可以了
@Test
public void testDirectYellow() {
//交换机名称
String exchangeName = "itcast.direct";
//消息
String msg = "hello DirectExchange";
//发送消息
rabbitTemplate.convertAndSend(exchangeName, "yellow", msg);
}
//convertAndSend的第二个参数就是bindingkey
Topic
和 Direct 唯一的区别就是 bindingkey 支持通配符
routingkey 支持多个单词(用.隔开)
监听器
//topicExchange绑定的queue2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到topic.Queue2的消息是:["+ msg +"]" + LocalTime.now());
}
发消息
@Test
public void testTopicQueue2() {
//交换机名称
String exchangeName = "itcast.topic";
//消息
String msg = "南昌下大雨了😭😭";
//发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
}
MessageConverter消息转换器
jackson序列化器
<!--jackson序列化器-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
//修改amqp底层的消息转换器
显式声明队列
都要配置为bean
@Configuration
public class FanoutConfig {
//itcast.fanout
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("itcastFanoutExchange");
}
//fanout.queue1
@Bean
public Queue fanoutQueue1() {
return new Queue("fanoutQueue1");
}
//绑定队列1到交换机
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
//fanout.queue2
@Bean
public Queue fanoutQueue2() {
return new Queue("fanoutQueue2");
}
//绑定队列2到交换机
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
@Bean
public Queue objectQueue() {
return new Queue("objectQueue");
}
}
监听的时候顺带配Queue
这个方法所属的类同样需要@Component
因为@RabbitListener的功能依赖于Spring的上下文和生命周期管理。如果类没有被Spring管理,它的方法将无法被自动调用,消息也就无法被消费。
//这个方法所属的类同样需要@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = {"#.news"}
))
public void listenTopicQueue2(String msg) throws InterruptedException {
System.err.println("消费者2接收到topic.Queue2的消息是:["+ msg +"]" + LocalTime.now());
}