SpringCloud Stream 消息驱动
什么是SpringCloudStream
SpringCloudStream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与SpringCloudStream中的binder对象交互。通过开发者来配置binding,而SpringCloudStream的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与SpringCloudStream交互就可以方便使用消息驱动的方式。通过***Spring Integration***来连接消息代理中间件以实现消息时间驱动。SpringCloudStream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
为什么要使用SpringCloudStream
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构不同,像RabbitMQ有exchange,Kafka有Topic和Partitions分区。
这些中间件的差异性导致实际项目开发给开发者造成一定的困扰,如果用了两个消息队列中的其中一种,到时候后面需求变更,想往另外一种消息对立进行迁移,这时候无疑是灾难性的,一大堆的东西要重新写,因为它们与系统耦合了,然而SpringCloudStream提供了一种解耦的方式。
SpringCloudStream为什么可以统一差异
在没有绑定器这个概念的情况下,我们在SpringBoot应用要直接与消息中间件进行信息交互的时候,由于每个消息中间件构建的初衷不同,它们实现细节上会有较大的差异性。
通过定义绑定器作为中间层,完美的实现了应用程序与消息中间件之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ切换为Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
SpringCloudStream标准流程
- Binder:很方便的连接中间件,屏蔽差异。
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
- Source和Sink:简单的可理解为参照对象是SpringCloudStream自身,从Stream发布消息就是输出,接收消息就是输入。
常用注解
- @Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序。
- @output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
- @StreamListener:监听队列,用于消费者的队列接收消息。
- @EnableBinding:指信道channel和exchange绑定在一起。
Provider代码
cloud:
stream:
binders: #在此处配置需要绑定的rabbitMQ的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #表示组件类型
environment: #设置rabbitMQ的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
output: #这个名字是一个通道名称
destination: xxxExchange #表示要使用的Exchange名称定义
content-type: application/json #设置消息类型
binder: defaultRabbit #设置要绑定的消息服务的具体设置
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl{
/* 消息发送管道 */
@Resource
private MessageChannel output;
/**
* 发送消息
*
* @param msg
* 发送的消息内容
* @return {@link String}
* @author Error
*/
public void send(String msg) {
output.send(MessageBuilder.withPayload(msg).build());
}
}
Consumer代码
cloud:
stream:
binders: #在此处配置需要绑定的rabbitMQ的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #表示组件类型
environment: #设置rabbitMQ的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
input: #这个名字是一个通道名称
destination: xxxExchange #表示要使用的Exchange名称定义
content-type: application/json #设置消息类型
binder: defaultRabbit #设置要绑定的消息服务的具体设置
@EnableBinding(Sink.class) //定义消息的推送管道
public class MessageConsumer {
/**
* 消费消息
* @param message
* 接收的消息
* @author Error
*/
@StreamListener(Sink.INPUT)
public void consume(Message<String> message){
message.getPayload();
}
}
消息分组
消息分组
当做集群部署时,存在多个消费者,都会从MQ中消费消息,如果同一个消息被多个消费者消费就可能造成*重复消费*。这种情况发生时可以使用Stream中的消息分组来解决。
注意在Stream中处于同一个group中的多个消费者时竞争关系,就能够保证消息只会被其中一个消费者消费。*不同组的是可以全面消费的(重复消费)*。
分组设置
cloud:
stream:
bindings: #服务的整合处理
input: #这个名字是一个通道名称
destination: xxxExchange #表示要使用的Exchange名称定义
content-type: application/json #设置消息类型
binder: defaultRabbit #设置要绑定的消息服务的具体设置
group: groupA #设置消费者分组