SpringCloud Stream 消息驱动

什么是SpringCloudStream

SpringCloudStream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与SpringCloudStream中的binder对象交互。通过开发者来配置binding,而SpringCloudStream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与SpringCloudStream交互就可以方便使用消息驱动的方式。通过***Spring Integration***来连接消息代理中间件以实现消息时间驱动。SpringCloudStream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

为什么要使用SpringCloudStream

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构不同,像RabbitMQ有exchange,Kafka有Topic和Partitions分区。

这些中间件的差异性导致实际项目开发给开发者造成一定的困扰,如果用了两个消息队列中的其中一种,到时候后面需求变更,想往另外一种消息对立进行迁移,这时候无疑是灾难性的,一大堆的东西要重新写,因为它们与系统耦合了,然而SpringCloudStream提供了一种解耦的方式。

SpringCloudStream为什么可以统一差异

在没有绑定器这个概念的情况下,我们在SpringBoot应用要直接与消息中间件进行信息交互的时候,由于每个消息中间件构建的初衷不同,它们实现细节上会有较大的差异性。

通过定义绑定器作为中间层,完美的实现了应用程序与消息中间件之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ切换为Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

SpringCloudStream标准流程

消息生产者SpringCloudStreamSourceChannelBinder业务逻辑消息生产者SpringCloudStreamSourceChannelBinder业务逻辑MQ组件RabbitMQkafka...
  • Binder:很方便的连接中间件,屏蔽差异。
  • Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
  • Source和Sink:简单的可理解为参照对象是SpringCloudStream自身,从Stream发布消息就是输出,接收消息就是输入。

常用注解

  • @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
  • @output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
  • @StreamListener:监听队列,用于消费者的队列接收消息。
  • @EnableBinding:指信道channel和exchange绑定在一起。

Provider代码

cloud:
  stream:
    binders:                                  #在此处配置需要绑定的rabbitMQ的服务信息
      defaultRabbit:                          #表示定义的名称,用于binding整合
        type: rabbit                          #表示组件类型
        environment:                          #设置rabbitMQ的相关环境配置
          spring:
            rabbitmq:
              host: localhost
              port: 5672
              username: guest
              password: guest
    bindings:                                 #服务的整合处理
      output:                                 #这个名字是一个通道名称
        destination: xxxExchange              #表示要使用的Exchange名称定义
        content-type: application/json        #设置消息类型
        binder: defaultRabbit                 #设置要绑定的消息服务的具体设置
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl{
    /* 消息发送管道 */
    @Resource
    private MessageChannel output;
    /**
     * 发送消息
     *
     * @param msg
     *        发送的消息内容
     * @return {@link String}
     * @author Error
     */
    public void send(String msg) {
        output.send(MessageBuilder.withPayload(msg).build());
    }
}

Consumer代码

cloud:
  stream:
    binders:                                  #在此处配置需要绑定的rabbitMQ的服务信息
      defaultRabbit:                          #表示定义的名称,用于binding整合
        type: rabbit                          #表示组件类型
        environment:                          #设置rabbitMQ的相关环境配置
          spring:
            rabbitmq:
              host: localhost
              port: 5672
              username: guest
              password: guest
    bindings:                                 #服务的整合处理
      input:                                 #这个名字是一个通道名称
        destination: xxxExchange              #表示要使用的Exchange名称定义
        content-type: application/json        #设置消息类型
        binder: defaultRabbit                 #设置要绑定的消息服务的具体设置
@EnableBinding(Sink.class) //定义消息的推送管道
public class MessageConsumer {
    /**
    *  消费消息
    * @param message
    *        接收的消息
    * @author Error
    */
    @StreamListener(Sink.INPUT)
    public void consume(Message<String> message){
        message.getPayload();
    }
}

消息分组

消息分组

当做集群部署时,存在多个消费者,都会从MQ中消费消息,如果同一个消息被多个消费者消费就可能造成*重复消费*。这种情况发生时可以使用Stream中的消息分组来解决。

注意在Stream中处于同一个group中的多个消费者时竞争关系,就能够保证消息只会被其中一个消费者消费。*不同组的是可以全面消费的(重复消费)*

分组设置

cloud:
  stream:
    bindings:                                 #服务的整合处理
      input:                                  #这个名字是一个通道名称
        destination: xxxExchange              #表示要使用的Exchange名称定义
        content-type: application/json        #设置消息类型
        binder: defaultRabbit                 #设置要绑定的消息服务的具体设置
        group: groupA                         #设置消费者分组