5. 消息传递
Spring Cloud AWS 提供了 Amazon SQS 和 Amazon SNS 集成,可简化通过 SQS 或 SNS 发布和消费消息的过程。虽然 SQS 完全依赖于 Spring 4.0 引入的消息 API,但 SNS 仅部分实现了该 API,因为接收部分需以不同方式处理推送通知。
5.1. 配置消息传递
在使用和配置消息支持之前,应用程序必须将相应的模块依赖项添加到 Maven 配置中。Spring Cloud AWS 消息支持作为一个独立的模块提供,以允许模块化地使用各个模块。
5.1.1. Maven 依赖配置
Spring Cloud AWS 消息模块作为一个独立模块提供,可以通过以下依赖声明进行导入:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-messaging</artifactId>
<version>{spring-cloud-version}</version>
</dependency>
5.2. SQS 支持
Amazon SQS 是 Amazon Web Services 平台上的托管消息服务,提供基于队列的点对点通信。与 JMS 或其他消息服务相比,Amazon SQS 具有若干特性及限制,这些应予以考虑。
-
Amazon SQS 仅允许
String个负载,因此任何Object都必须转换为字符串表示形式。Spring Cloud AWS 专门支持通过将 Java 对象转换为 JSON 来传输 Amazon SQS 消息。 -
Amazon SQS 不支持事务,因此消息可能会被重复获取。应用程序必须以幂等的方式编写,以便能够两次接收同一消息。
-
Amazon SQS 的每条消息最大尺寸为 256KB,因此更大的消息将无法成功发送。
5.2.1. 发送消息
类 QueueMessagingTemplate 包含许多便捷方法用于发送消息。其中包含使用 QueueMessageChannel 对象指定目标的发送方法,以及使用字符串指定目标的方法(该字符串将针对 SQS API 进行解析)。不带目标参数的发送方法将使用默认目标。
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;
public class SqsQueueSender {
private final QueueMessagingTemplate queueMessagingTemplate;
@Autowired
public SqsQueueSender(AmazonSQSAsync amazonSQSAsync) {
this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
}
public void send(String message) {
this.queueMessagingTemplate.send("physicalQueueName", MessageBuilder.withPayload(message).build());
}
}
本示例使用 MessageBuilder 类来创建一个带有字符串负载的消息。QueueMessagingTemplate 是通过将对 AmazonSQSAsync 客户端的引用传递给其构造函数而构建的。在 send 方法中指定的目标是一个字符串值,该值必须与 AWS 中定义的队列名称相匹配。此值将在运行时由 Amazon SQS 客户端解析。可选地,可以将一个 ResourceIdResolver 实现传递给 QueueMessagingTemplate 的构造函数,以便在 CloudFormation 堆栈内部运行时通过逻辑名称解析资源(有关资源名称解析的更多信息,请参阅 管理云环境)。
使用消息命名空间,可以在 XML 配置文件中定义一个 QueueMessagingTemplate。
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aws-context="http://www.springframework.org/schema/cloud/aws/context"
xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
xmlns="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/cloud/aws/context
http://www.springframework.org/schema/cloud/aws/context/spring-cloud-aws-context.xsd
http://www.springframework.org/schema/cloud/aws/messaging
http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging">
<aws-context:context-credentials>
<aws-context:instance-profile-credentials />
</aws-context:context-credentials>
<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
</beans>
在本示例中,消息命名空间处理器会构建一个新的 QueueMessagingTemplate。根据所提供的凭据,AmazonSQSAsync 客户端会自动创建并传递给模板的构造函数。如果应用程序运行在已配置的 CloudFormation 堆栈内,则会将 ResourceIdResolver 传递给构造函数(有关资源名称解析的更多信息,请参阅 管理云环境)。
使用消息转换器
为了便于发送领域模型对象,QueueMessagingTemplate 提供了多种发送方法,这些方法以 Java 对象作为参数,用于设置消息的数据内容。QueueMessagingTemplate 中的重载方法 convertAndSend() 和 receiveAndConvert() 将转换过程委托给 MessageConverter 接口的一个实例。该接口定义了一个简单的契约,用于在 Java 对象与 SQS 消息之间进行转换。默认实现 SimpleMessageConverter 仅在消息有效负载与目标类型匹配时,简单地将其解包(unwrap)。通过使用此转换器,您和您的应用程序代码可以专注于通过 SQS 发送或接收的业务对象,而无需关心其在 SQS 消息中如何表示。
|
由于 SQS 一次只能发送 |
建议使用 XML 消息命名空间来创建 QueueMessagingTemplate,因为它会设置一个更高级的 MessageConverter,当 Jackson 在类路径上时,该命名空间可将对象转换为 JSON。
<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
this.queueMessagingTemplate.convertAndSend("queueName", new Person("John, "Doe"));
在此示例中,使用消息命名空间创建了一个 QueueMessagingTemplate。convertAndSend 方法使用已配置的 MessageConverter 将有效载荷 Person 进行转换并发送该消息。
5.2.2. 接收消息
接收 SQS 消息有两种方式,一是使用 receive 方法的 QueueMessagingTemplate,二是通过注解驱动的监听器端点。后者是接收消息更为便捷的方式。
Person person = this.queueMessagingTemplate.receiveAndConvert("queueName", Person.class);
在此示例中,QueueMessagingTemplate 将从 SQS 队列中获取一条消息,并将其转换为作为参数传递的目标类。
5.2.3. 注解驱动的监听器端点
基于注解的监听器端点是监听 SQS 消息最简单的方式。只需使用 MessageMapping 注解方法,QueueMessageHandler 将把消息路由到已注解的方法上。
<aws-messaging:annotation-driven-queue-listener />
@SqsListener("queueName")
public void queueListener(Person person) {
// ...
}
在此示例中,启动了一个队列监听容器,该容器轮询通过 queueName 传递给 MessageMapping 注解的 SQS 队列。传入的消息会被转换为目标类型,随后调用标注的方法 queueListener。
除了有效载荷外,还可以通过 @Header 或 @Headers 注解在监听器方法中注入头信息。@Header 用于注入特定的头信息值,而 @Headers 则注入一个包含所有头信息的 Map<String, String>。
仅支持随 SQS 消息一起发送的 标准消息属性。当前不支持自定义属性。
除了提供的参数解析器外,还可以通过 aws-messaging:annotation-driven-queue-listener 元素使用 aws-messaging:argument-resolvers 属性注册自定义的参数解析器(见下方示例)。
<aws-messaging:annotation-driven-queue-listener>
<aws-messaging:argument-resolvers>
<bean class="org.custom.CustomArgumentResolver" />
</aws-messaging:argument-resolvers>
</aws-messaging:annotation-driven-queue-listener>
默认情况下,SimpleMessageListenerContainer 会创建一个具有核心线程池大小和最大线程池大小计算值的 ThreadPoolTaskExecutor。核心线程池大小被设置为队列数量的两倍,而最大线程池大小则通过将队列数量乘以 maxNumberOfMessages 字段的值来获得。如果这些默认值无法满足应用程序的需求,可以通过 task-executor 属性设置自定义任务执行器(见下方示例)。
<aws-messaging:annotation-driven-queue-listener task-executor="simpleTaskExecutor" />
消息回复
消息监听器方法可以使用 @SendTo 进行注解,以将其返回值发送到另一个通道。SendToHandlerMethodReturnValueHandler 使用在 aws-messaging:annotation-driven-queue-listener 元素上定义的消息模板集来发送返回值。该消息模板必须实现 DestinationResolvingMessageSendingOperations 接口。
<aws-messaging:annotation-driven-queue-listener send-to-message-template="queueMessagingTemplate"/>
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
// ...
}
在本示例中,extractLeafs 方法将接收来自 treeQueue 的消息,然后返回一个由 Leaf 组成的 List,该返回值将被发送至 leafsQueue。请注意,在 aws-messaging:annotation-driven-queue-listener XML 元素上存在一个名为 send-to-message-template 的属性,它指定了用于发送消息监听器方法返回值的通信模板为 QueueMessagingTemplate。
处理异常
在使用 @SqsListener 注解的方法内部抛出的异常,可以通过使用 @MessageExceptionHandler 注解的方法进行处理。
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;
@Component
public class MyMessageHandler {
@SqsListener("queueName")
void handle(String message) {
...
throw new MyException("something went wrong");
}
@MessageExceptionHandler(MyException.class)
void handleException(MyException e) {
...
}
}
5.2.4. 简单消息监听容器工厂
可以通过 Java 创建类型为 SimpleMessageListenerContainerFactory 的 Bean 来配置 SimpleMessageListenerContainer。
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setAutoStartup(false);
factory.setMaxNumberOfMessages(5);
// ...
return factory;
}
5.2.5. 使用 Amazon SQS 消费 AWS 事件消息
也可以通过 SQS 消息监听器接收 AWS 生成的事件消息。由于 AWS 消息不包含 MIME 类型头,因此必须将 Jackson 消息转换器配置为 strictContentTypeMatch 属性为 false,以支持在没有正确 MIME 类型的情况下解析消息。
下面的代码展示了使用 QueueMessageHandlerFactory 配置消息转换器,并重新配置 MappingJackson2MessageConverter。
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
//set strict content type match to false
messageConverter.setStrictContentTypeMatch(false);
factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
return factory;
}
在上述配置下,可以在 @SqsListener 注解的方法中接收 S3 存储桶(以及其他事件通知,如 Elastic Transcoder 消息)的事件通知,如下所示。
@SqsListener("testQueue")
public void receive(S3EventNotification s3EventNotificationRecord) {
S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getRecords().get(0).getS3();
}
5.3. SNS 支持
Amazon SNS 是一种发布-订阅消息系统,允许客户端将通知发布到特定主题。其他感兴趣的客户端可使用 HTTP/HTTPS、电子邮件或 Amazon SQS 队列等不同协议进行订阅,以接收消息。
下图显示了一个典型的 Amazon SNS 架构示例。
Spring Cloud AWS 通过提供支持发送通知的 NotificationMessagingTemplate 以及使用基于 Spring Web MVC 的 @Controller 编程模型,通过 HTTP/HTTPS 端点接收通知,从而支持 Amazon SNS。Amazon SQS 基于的订阅可与 Spring Cloud AWS 消息模块所提供的注解驱动消息支持一起使用。
5.3.1. 发送消息
该 NotificationMessagingTemplate 包含两个便捷方法用于发送通知。第一个方法通过 String 指定目标,该目标将针对 SNS API 进行解析。第二个方法不接受目标参数,并使用默认目标。所有在 MessageSendingOperations 上可用的常规发送方法均已实现,但因需将主题作为头信息传递,因此在发送通知时不够便捷。
|
目前仅可使用 |
import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;
public class SnsNotificationSender {
private final NotificationMessagingTemplate notificationMessagingTemplate;
@Autowired
public SnsNotificationSender(AmazonSNS amazonSns) {
this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
}
public void send(String subject, String message) {
this.notificationMessagingTemplate.sendNotification("physicalTopicName", message, subject);
}
}
本示例通过将 NotificationMessagingTemplate 客户端作为参数传递来构造一个新的 NotificationMessagingTemplate。在 send 方法中,使用便捷的 sendNotification 方法向 SNS 主题发送一条包含 subject 的 message 消息。在 sendNotification 方法中的目标是一个字符串值,该值必须与 AWS 中定义的主题名称相匹配。此值由 Amazon SNS 客户端在运行时解析。可选地,可将一个 ResourceIdResolver 实现传递给 NotificationMessagingTemplate 构造函数,以便在 CloudFormation 堆栈内运行时根据逻辑名称解析资源。(有关资源名称解析的更多信息,请参阅 管理云环境。)
建议使用 XML 消息命名空间来创建 NotificationMessagingTemplate,这样可自动配置 SNS 客户端以设置默认转换器。
<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />
5.3.2. 注解驱动的 HTTP 通知端点
SNS 支持多种端点类型(SQS、邮件、HTTP、HTTPS),Spring Cloud AWS 提供对 HTTP(S) 端点的支持。<br/>SNS 向 HTTP 主题监听器端点发送三种类型的请求,每种请求都提供了相应的注解:
-
订阅请求 →
@NotificationSubscriptionMapping -
通知请求 →
@NotificationMessageMapping -
退订请求 →
@NotificationUnsubscribeMapping
HTTP 端点基于 Spring MVC 控制器。Spring Cloud AWS 添加了一些自定义参数解析器,用于从通知请求中提取消息和主题。
@Controller
@RequestMapping("/topicName")
public class NotificationTestController {
@NotificationSubscriptionMapping
public void handleSubscriptionMessage(NotificationStatus status) throws IOException {
//We subscribe to start receive the message
status.confirmSubscription();
}
@NotificationMessageMapping
public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
// ...
}
@NotificationUnsubscribeConfirmationMapping
public void handleUnsubscribeMessage(NotificationStatus status) {
//e.g. the client has been unsubscribed and we want to "re-subscribe"
status.confirmSubscription();
}
}
|
目前无法在方法级别定义映射URL,因此必须在类型级别进行配置,并且必须包含端点的完整路径。 |
此示例创建了一个新的 Spring MVC 控制器,其中包含三个方法,用于处理上述三种请求。为了解析 handleNotificationMessage 方法的参数,必须注册一个自定义参数解析器。下面列出了 XML 配置。
<mvc:annotation-driven>
<mvc:argument-resolvers>
<ref bean="notificationResolver" />
</mvc:argument-resolvers>
</mvc:annotation-driven>
<aws-messaging:notification-argument-resolver id="notificationResolver" />
元素 aws-messaging:notification-argument-resolver 注册了三个参数解析器:NotificationStatusHandlerMethodArgumentResolver、NotificationMessageHandlerMethodArgumentResolver 和 NotificationSubjectHandlerMethodArgumentResolver。
5.4. 使用 CloudFormation
Amazon SQS 队列和 SNS 主题可以在堆栈中进行配置,然后供应用程序使用。Spring Cloud AWS 还支持通过逻辑名称查找堆栈中配置的队列和主题,并将其解析为物理名称。下面的示例展示了在 CloudFormation 模板中配置一个 SNS 主题和 SQS 队列。
"LogicalQueueName": {
"Type": "AWS::SQS::Queue",
"Properties": {
}
},
"LogicalTopicName": {
"Type": "AWS::SNS::Topic",
"Properties": {
}
}
逻辑名称 LogicalQueueName 和 LogicalTopicName 随后可在配置和应用程序中如以下所示使用:
<aws-messaging:queue-messaging-template default-destination="LogicalQueueName" />
<aws-messaging:notification-messaging-template default-destination="LogicalTopicName" />
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
// Logical names can also be used with messaging templates
this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
}
在使用上述示例中的逻辑名称时,可以在不同环境中创建堆栈,而无需对应用程序内部进行任何配置或代码更改。