RabbitMQ 是一个开源的消息代理中间件,基于 AMQP 协议,用于实现应用程序之间的异步消息传递和解耦。

依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

yaml配置

server:
  port: 8081
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 35.229.236.44
    port: 10309
    username: itcast
    password: 123321
    virtual-host: /
    listener:
      simple:
        prefetch: 1

prefetch: 1的意思是消费者在处理完一个消息后可以立刻再得到一个消息

直接发送消息

    @Test
    public void testSendMsg2Queue() {
        String queueName = "simple.queue";//队列名
        String msg = "hello spring amqp";
        rabbitTemplate.convertAndSend(queueName, msg);
    }

直接接收消息

    @RabbitListener(queues = "simple.queue")//监听队列名
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到simple.queue的消息是:["+ msg +"]" + LocalTime.now());
        Thread.sleep(20);
    }

Fanout交换机

Fanout Exchange 是一种 RabbitMQ 的交换机类型,它将接收到的消息广播到所有绑定到它的队列,不考虑路由键,从而实现消息的多路复用。

创建并绑定交换机和队列

@Configuration
public class FanoutConfig {
    //itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("itcastFanoutExchange");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanoutQueue1");
    }
    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    //fanout.queue2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanoutQueue2");
    }
    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

}

发消息

    @Test
    public void testFanout() {
        //交换机名称
        String exchangeName = "itcastFanoutExchange";
        //消息
        String  msg = "hello fanoutExchange";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName, "", msg);
    }

接收消息

仍然是监听队列, 其他都一样

    @RabbitListener(queues = "fanoutQueue2")
    public void listenFanoutQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到fanoutQueue1的消息是:["+ msg +"]" + LocalTime.now());
        Thread.sleep(20);
    }

Direct

Direct 交换机需要指定routekey和bindingkey, 路由只会把消息推送到bindingkey和当前路由routingkey一致的消息队列(bindingkey可设置多个)

监听方法

    //direct交换机所对应的queue1
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到fanoutQueue1的消息是:["+ msg +"]" + LocalTime.now());
        Thread.sleep(20);
    }

消息发送方法
跟fanout其实是一样的, 只要指定把消息发送到directadmin和routekey就可以了

    @Test
    public void testDirectYellow() {
        //交换机名称
        String exchangeName = "itcast.direct";
        //消息
        String  msg = "hello DirectExchange";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName, "yellow", msg);
    }
//convertAndSend的第二个参数就是bindingkey

Topic

Direct 唯一的区别就是 bindingkey 支持通配符
routingkey 支持多个单词(用.隔开)
监听器

    //topicExchange绑定的queue2
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public void listenTopicQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到topic.Queue2的消息是:["+ msg +"]" + LocalTime.now());
    }

发消息

    @Test
    public void testTopicQueue2() {
        //交换机名称
        String exchangeName = "itcast.topic";
        //消息
        String  msg = "南昌下大雨了😭😭";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
    }

MessageConverter消息转换器

jackson序列化器

        <!--jackson序列化器-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
   @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
//修改amqp底层的消息转换器

显式声明队列

都要配置为bean

@Configuration
public class FanoutConfig {
    //itcast.fanout
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("itcastFanoutExchange");
    }
    //fanout.queue1
    @Bean
    public Queue fanoutQueue1() {
        return new Queue("fanoutQueue1");
    }
    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    //fanout.queue2
    @Bean
    public Queue fanoutQueue2() {
        return new Queue("fanoutQueue2");
    }
    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    @Bean
    public Queue objectQueue() {
        return new Queue("objectQueue");
    }

}

监听的时候顺带配Queue

这个方法所属的类同样需要@Component
因为@RabbitListener的功能依赖于Spring的上下文和生命周期管理。如果类没有被Spring管理,它的方法将无法被自动调用,消息也就无法被消费。

 //这个方法所属的类同样需要@Component

   @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public void listenTopicQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2接收到topic.Queue2的消息是:["+ msg +"]" + LocalTime.now());
    }