5. 消息传递

Spring Cloud AWS 提供了 Amazon SQSAmazon SNS 集成,可简化通过 SQS 或 SNS 发布和消费消息的过程。虽然 SQS 完全依赖于 Spring 4.0 引入的消息 API,但 SNS 仅部分实现了该 API,因为接收部分需以不同方式处理推送通知。spring-doc.cadn.net.cn

5.1. 配置消息传递

在使用和配置消息支持之前,应用程序必须将相应的模块依赖项添加到 Maven 配置中。Spring Cloud AWS 消息支持作为一个独立的模块提供,以允许模块化地使用各个模块。spring-doc.cadn.net.cn

5.1.1. Maven 依赖配置

Spring Cloud AWS 消息模块作为一个独立模块提供,可以通过以下依赖声明进行导入:spring-doc.cadn.net.cn

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-aws-messaging</artifactId>
    <version>{spring-cloud-version}</version>
</dependency>

5.2. SQS 支持

Amazon SQS 是 Amazon Web Services 平台上的托管消息服务,提供基于队列的点对点通信。与 JMS 或其他消息服务相比,Amazon SQS 具有若干特性及限制,这些应予以考虑。spring-doc.cadn.net.cn

  • Amazon SQS 仅允许 String 个负载,因此任何 Object 都必须转换为字符串表示形式。Spring Cloud AWS 专门支持通过将 Java 对象转换为 JSON 来传输 Amazon SQS 消息。spring-doc.cadn.net.cn

  • Amazon SQS 不支持事务,因此消息可能会被重复获取。应用程序必须以幂等的方式编写,以便能够两次接收同一消息。spring-doc.cadn.net.cn

  • Amazon SQS 的每条消息最大尺寸为 256KB,因此更大的消息将无法成功发送。spring-doc.cadn.net.cn

5.2.1. 发送消息

QueueMessagingTemplate 包含许多便捷方法用于发送消息。其中包含使用 QueueMessageChannel 对象指定目标的发送方法,以及使用字符串指定目标的方法(该字符串将针对 SQS API 进行解析)。不带目标参数的发送方法将使用默认目标。spring-doc.cadn.net.cn

import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;

public class SqsQueueSender {

    private final QueueMessagingTemplate queueMessagingTemplate;

    @Autowired
    public SqsQueueSender(AmazonSQSAsync amazonSQSAsync) {
        this.queueMessagingTemplate = new QueueMessagingTemplate(amazonSQSAsync);
    }

    public void send(String message) {
        this.queueMessagingTemplate.send("physicalQueueName", MessageBuilder.withPayload(message).build());
    }
}

本示例使用 MessageBuilder 类来创建一个带有字符串负载的消息。QueueMessagingTemplate 是通过将对 AmazonSQSAsync 客户端的引用传递给其构造函数而构建的。在 send 方法中指定的目标是一个字符串值,该值必须与 AWS 中定义的队列名称相匹配。此值将在运行时由 Amazon SQS 客户端解析。可选地,可以将一个 ResourceIdResolver 实现传递给 QueueMessagingTemplate 的构造函数,以便在 CloudFormation 堆栈内部运行时通过逻辑名称解析资源(有关资源名称解析的更多信息,请参阅 管理云环境)。spring-doc.cadn.net.cn

使用消息命名空间,可以在 XML 配置文件中定义一个 QueueMessagingTemplatespring-doc.cadn.net.cn

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aws-context="http://www.springframework.org/schema/cloud/aws/context"
    xmlns:aws-messaging="http://www.springframework.org/schema/cloud/aws/messaging"
    xmlns="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        https://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/cloud/aws/context
        http://www.springframework.org/schema/cloud/aws/context/spring-cloud-aws-context.xsd
        http://www.springframework.org/schema/cloud/aws/messaging
        http://www.springframework.org/schema/cloud/aws/messaging/spring-cloud-aws-messaging">

    <aws-context:context-credentials>
        <aws-context:instance-profile-credentials />
    </aws-context:context-credentials>

    <aws-messaging:queue-messaging-template id="queueMessagingTemplate" />

</beans>

在本示例中,消息命名空间处理器会构建一个新的 QueueMessagingTemplate。根据所提供的凭据,AmazonSQSAsync 客户端会自动创建并传递给模板的构造函数。如果应用程序运行在已配置的 CloudFormation 堆栈内,则会将 ResourceIdResolver 传递给构造函数(有关资源名称解析的更多信息,请参阅 管理云环境)。spring-doc.cadn.net.cn

使用消息转换器

为了便于发送领域模型对象,QueueMessagingTemplate 提供了多种发送方法,这些方法以 Java 对象作为参数,用于设置消息的数据内容。QueueMessagingTemplate 中的重载方法 convertAndSend()receiveAndConvert() 将转换过程委托给 MessageConverter 接口的一个实例。该接口定义了一个简单的契约,用于在 Java 对象与 SQS 消息之间进行转换。默认实现 SimpleMessageConverter 仅在消息有效负载与目标类型匹配时,简单地将其解包(unwrap)。通过使用此转换器,您和您的应用程序代码可以专注于通过 SQS 发送或接收的业务对象,而无需关心其在 SQS 消息中如何表示。spring-doc.cadn.net.cn

由于 SQS 一次只能发送 String 个负载,因此默认转换器 SimpleMessageConverter 仅应用于发送 String 个负载。对于更复杂对象,应使用自定义转换器,例如由消息命名空间处理器创建的转换器。spring-doc.cadn.net.cn

建议使用 XML 消息命名空间来创建 QueueMessagingTemplate,因为它会设置一个更高级的 MessageConverter,当 Jackson 在类路径上时,该命名空间可将对象转换为 JSON。spring-doc.cadn.net.cn

<aws-messaging:queue-messaging-template id="queueMessagingTemplate" />
this.queueMessagingTemplate.convertAndSend("queueName", new Person("John, "Doe"));

在此示例中,使用消息命名空间创建了一个 QueueMessagingTemplateconvertAndSend 方法使用已配置的 MessageConverter 将有效载荷 Person 进行转换并发送该消息。spring-doc.cadn.net.cn

5.2.2. 接收消息

接收 SQS 消息有两种方式,一是使用 receive 方法的 QueueMessagingTemplate,二是通过注解驱动的监听器端点。后者是接收消息更为便捷的方式。spring-doc.cadn.net.cn

Person person = this.queueMessagingTemplate.receiveAndConvert("queueName", Person.class);

在此示例中,QueueMessagingTemplate 将从 SQS 队列中获取一条消息,并将其转换为作为参数传递的目标类。spring-doc.cadn.net.cn

5.2.3. 注解驱动的监听器端点

基于注解的监听器端点是监听 SQS 消息最简单的方式。只需使用 MessageMapping 注解方法,QueueMessageHandler 将把消息路由到已注解的方法上。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener />
@SqsListener("queueName")
public void queueListener(Person person) {
    // ...
}

在此示例中,启动了一个队列监听容器,该容器轮询通过 queueName 传递给 MessageMapping 注解的 SQS 队列。传入的消息会被转换为目标类型,随后调用标注的方法 queueListenerspring-doc.cadn.net.cn

除了有效载荷外,还可以通过 @Header@Headers 注解在监听器方法中注入头信息。@Header 用于注入特定的头信息值,而 @Headers 则注入一个包含所有头信息的 Map<String, String>spring-doc.cadn.net.cn

仅支持随 SQS 消息一起发送的 标准消息属性。当前不支持自定义属性。spring-doc.cadn.net.cn

除了提供的参数解析器外,还可以通过 aws-messaging:annotation-driven-queue-listener 元素使用 aws-messaging:argument-resolvers 属性注册自定义的参数解析器(见下方示例)。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener>
    <aws-messaging:argument-resolvers>
        <bean class="org.custom.CustomArgumentResolver" />
    </aws-messaging:argument-resolvers>
</aws-messaging:annotation-driven-queue-listener>

默认情况下,SimpleMessageListenerContainer 会创建一个具有核心线程池大小和最大线程池大小计算值的 ThreadPoolTaskExecutor。核心线程池大小被设置为队列数量的两倍,而最大线程池大小则通过将队列数量乘以 maxNumberOfMessages 字段的值来获得。如果这些默认值无法满足应用程序的需求,可以通过 task-executor 属性设置自定义任务执行器(见下方示例)。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener task-executor="simpleTaskExecutor" />
消息回复

消息监听器方法可以使用 @SendTo 进行注解,以将其返回值发送到另一个通道。SendToHandlerMethodReturnValueHandler 使用在 aws-messaging:annotation-driven-queue-listener 元素上定义的消息模板集来发送返回值。该消息模板必须实现 DestinationResolvingMessageSendingOperations 接口。spring-doc.cadn.net.cn

<aws-messaging:annotation-driven-queue-listener send-to-message-template="queueMessagingTemplate"/>
@SqsListener("treeQueue")
@SendTo("leafsQueue")
public List<Leaf> extractLeafs(Tree tree) {
    // ...
}

在本示例中,extractLeafs 方法将接收来自 treeQueue 的消息,然后返回一个由 Leaf 组成的 List,该返回值将被发送至 leafsQueue。请注意,在 aws-messaging:annotation-driven-queue-listener XML 元素上存在一个名为 send-to-message-template 的属性,它指定了用于发送消息监听器方法返回值的通信模板为 QueueMessagingTemplatespring-doc.cadn.net.cn

处理异常

在使用 @SqsListener 注解的方法内部抛出的异常,可以通过使用 @MessageExceptionHandler 注解的方法进行处理。spring-doc.cadn.net.cn

import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;

@Component
public class MyMessageHandler {

    @SqsListener("queueName")
    void handle(String message) {
        ...
        throw new MyException("something went wrong");
    }

    @MessageExceptionHandler(MyException.class)
    void handleException(MyException e) {
        ...
    }
}

5.2.4. 简单消息监听容器工厂

可以通过 Java 创建类型为 SimpleMessageListenerContainerFactory 的 Bean 来配置 SimpleMessageListenerContainerspring-doc.cadn.net.cn

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSqs);
    factory.setAutoStartup(false);
    factory.setMaxNumberOfMessages(5);
    // ...

    return factory;
}

5.2.5. 使用 Amazon SQS 消费 AWS 事件消息

也可以通过 SQS 消息监听器接收 AWS 生成的事件消息。由于 AWS 消息不包含 MIME 类型头,因此必须将 Jackson 消息转换器配置为 strictContentTypeMatch 属性为 false,以支持在没有正确 MIME 类型的情况下解析消息。spring-doc.cadn.net.cn

下面的代码展示了使用 QueueMessageHandlerFactory 配置消息转换器,并重新配置 MappingJackson2MessageConverterspring-doc.cadn.net.cn

@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
    QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();

    //set strict content type match to false
    messageConverter.setStrictContentTypeMatch(false);
    factory.setArgumentResolvers(Collections.<HandlerMethodArgumentResolver>singletonList(new PayloadArgumentResolver(messageConverter)));
    return factory;
}

在上述配置下,可以在 @SqsListener 注解的方法中接收 S3 存储桶(以及其他事件通知,如 Elastic Transcoder 消息)的事件通知,如下所示。spring-doc.cadn.net.cn

@SqsListener("testQueue")
public void receive(S3EventNotification s3EventNotificationRecord) {
    S3EventNotification.S3Entity s3Entity = s3EventNotificationRecord.getRecords().get(0).getS3();
}

5.3. SNS 支持

Amazon SNS 是一种发布-订阅消息系统,允许客户端将通知发布到特定主题。其他感兴趣的客户端可使用 HTTP/HTTPS、电子邮件或 Amazon SQS 队列等不同协议进行订阅,以接收消息。spring-doc.cadn.net.cn

下图显示了一个典型的 Amazon SNS 架构示例。spring-doc.cadn.net.cn

SNS Overview

Spring Cloud AWS 通过提供支持发送通知的 NotificationMessagingTemplate 以及使用基于 Spring Web MVC 的 @Controller 编程模型,通过 HTTP/HTTPS 端点接收通知,从而支持 Amazon SNS。Amazon SQS 基于的订阅可与 Spring Cloud AWS 消息模块所提供的注解驱动消息支持一起使用。spring-doc.cadn.net.cn

5.3.1. 发送消息

NotificationMessagingTemplate 包含两个便捷方法用于发送通知。第一个方法通过 String 指定目标,该目标将针对 SNS API 进行解析。第二个方法不接受目标参数,并使用默认目标。所有在 MessageSendingOperations 上可用的常规发送方法均已实现,但因需将主题作为头信息传递,因此在发送通知时不够便捷。spring-doc.cadn.net.cn

目前仅可使用 String 有效负载通过 NotificationMessagingTemplate 发送,因为这是 SNS API 所期望的类型。spring-doc.cadn.net.cn

import com.amazonaws.services.sns.AmazonSNS;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;

public class SnsNotificationSender {

    private final NotificationMessagingTemplate notificationMessagingTemplate;

    @Autowired
    public SnsNotificationSender(AmazonSNS amazonSns) {
        this.notificationMessagingTemplate = new NotificationMessagingTemplate(amazonSns);
    }

    public void send(String subject, String message) {
        this.notificationMessagingTemplate.sendNotification("physicalTopicName", message, subject);
    }
}

本示例通过将 NotificationMessagingTemplate 客户端作为参数传递来构造一个新的 NotificationMessagingTemplate。在 send 方法中,使用便捷的 sendNotification 方法向 SNS 主题发送一条包含 subjectmessage 消息。在 sendNotification 方法中的目标是一个字符串值,该值必须与 AWS 中定义的主题名称相匹配。此值由 Amazon SNS 客户端在运行时解析。可选地,可将一个 ResourceIdResolver 实现传递给 NotificationMessagingTemplate 构造函数,以便在 CloudFormation 堆栈内运行时根据逻辑名称解析资源。(有关资源名称解析的更多信息,请参阅 管理云环境。)spring-doc.cadn.net.cn

建议使用 XML 消息命名空间来创建 NotificationMessagingTemplate,这样可自动配置 SNS 客户端以设置默认转换器。spring-doc.cadn.net.cn

<aws-messaging:notification-messaging-template id="notificationMessagingTemplate" />

5.3.2. 注解驱动的 HTTP 通知端点

SNS 支持多种端点类型(SQS、邮件、HTTP、HTTPS),Spring Cloud AWS 提供对 HTTP(S) 端点的支持。<br/>SNS 向 HTTP 主题监听器端点发送三种类型的请求,每种请求都提供了相应的注解:spring-doc.cadn.net.cn

HTTP 端点基于 Spring MVC 控制器。Spring Cloud AWS 添加了一些自定义参数解析器,用于从通知请求中提取消息和主题。spring-doc.cadn.net.cn

@Controller
@RequestMapping("/topicName")
public class NotificationTestController {

    @NotificationSubscriptionMapping
    public void handleSubscriptionMessage(NotificationStatus status) throws IOException {
        //We subscribe to start receive the message
        status.confirmSubscription();
    }

    @NotificationMessageMapping
    public void handleNotificationMessage(@NotificationSubject String subject, @NotificationMessage String message) {
        // ...
    }

    @NotificationUnsubscribeConfirmationMapping
    public void handleUnsubscribeMessage(NotificationStatus status) {
        //e.g. the client has been unsubscribed and we want to "re-subscribe"
        status.confirmSubscription();
    }
}

目前无法在方法级别定义映射URL,因此必须在类型级别进行配置,并且必须包含端点的完整路径。spring-doc.cadn.net.cn

此示例创建了一个新的 Spring MVC 控制器,其中包含三个方法,用于处理上述三种请求。为了解析 handleNotificationMessage 方法的参数,必须注册一个自定义参数解析器。下面列出了 XML 配置。spring-doc.cadn.net.cn

<mvc:annotation-driven>
    <mvc:argument-resolvers>
        <ref bean="notificationResolver" />
    </mvc:argument-resolvers>
</mvc:annotation-driven>

<aws-messaging:notification-argument-resolver id="notificationResolver" />

元素 aws-messaging:notification-argument-resolver 注册了三个参数解析器:NotificationStatusHandlerMethodArgumentResolverNotificationMessageHandlerMethodArgumentResolverNotificationSubjectHandlerMethodArgumentResolverspring-doc.cadn.net.cn

5.4. 使用 CloudFormation

Amazon SQS 队列和 SNS 主题可以在堆栈中进行配置,然后供应用程序使用。Spring Cloud AWS 还支持通过逻辑名称查找堆栈中配置的队列和主题,并将其解析为物理名称。下面的示例展示了在 CloudFormation 模板中配置一个 SNS 主题和 SQS 队列。spring-doc.cadn.net.cn

"LogicalQueueName": {
    "Type": "AWS::SQS::Queue",
    "Properties": {
    }
},
"LogicalTopicName": {
    "Type": "AWS::SNS::Topic",
    "Properties": {
    }
}

逻辑名称 LogicalQueueNameLogicalTopicName 随后可在配置和应用程序中如以下所示使用:spring-doc.cadn.net.cn

<aws-messaging:queue-messaging-template default-destination="LogicalQueueName" />

<aws-messaging:notification-messaging-template default-destination="LogicalTopicName" />
@SqsListener("LogicalQueueName")
public void receiveQueueMessages(Person person) {
    // Logical names can also be used with messaging templates
    this.notificationMessagingTemplate.sendNotification("anotherLogicalTopicName", "Message", "Subject");
}

在使用上述示例中的逻辑名称时,可以在不同环境中创建堆栈,而无需对应用程序内部进行任何配置或代码更改。spring-doc.cadn.net.cn