Event Driven Architecture using AWS SQS and Spring boot
Fostering loose coupling and async data processing is the fundamental principle of Microservice Architecture. Event driven processing is an essential component of microservice design. Whether they are downstream or upstream of the application, event-driven architecture makes it straightforward for integrating many domain microservices.
Here we’ll go over a typical scenario and pattern that might help you build an Event Driven Microservice architecture using Spring Boot and AWS SQS.
If you want to store input data to a database and then raise an event for orchestration or notification, you’ll need to use the Pub/Sub pattern.
The usual circumstances to utilize these pattens are:
a. After processing the request, sending the email notification.
b. Integration with another domain service.
c. Retrying request and failure handling.
User Case: When a user is registered in the User Profile microservice, they are also registered with the Loyalty service, which is an external integration. The User microservice/registration API will receive input, validate the input data, and raise the USER_REGISTERED event once the data has been successfully saved. The Loyalty integration handler service will then monitor for this event in order to initiate the integration with the Loyalty endpoint for further processing.
Solution Diagram
Step by step Implementation
1. Installing Java
I am using Amazon Corretto 17, please find the link to install in the window.
2. Creating a IAM User and Assign a Role
Please follow the link to create a IAM User and assign the Programmatical access.
Please assign the Administrator access to IAM user or Role must have below permission to produce and listener to and from the SQS queue.
3. Configure the AWS CLI access.
Please download and install the AWS CLI refer the link. Please use below command to cofigure the Access Key and Secret key.
$ aws configure
AWS Access Key ID [None]: <<ACCESS-KEY>>
AWS Secret Access Key [None]: << SECRET-ACCESS-KEY >>
Default region name [None]: <<REGION>>
Default output format [None]: json
4. Creating a SQS Queue and DLQ
a. goto → Amazon SQS-> Queues
Click on Create Queue: Select Standard QUEUE and
Name = USER-PROFILE-QUEUE
b. Configuration
While the exact setup may vary from use case to use case, I’ve optimized it for message processing speed.
c. Encryption
Encryption must be enabled for the enterprise application.
d. Access policy
I have used the default policy.
e. Redrive allow policy
We will modify the configuration once the DLQ has been created.
f. Dead-letter queue
Use the default configuration.
6. Creating a DLQ
For messages that did not make it via the Standard Queue, we will use the DLQ. It may be used to deliver notification or logging the event when something fails, or to retry after an unsuccessful attempt. Please follow to create DLQ below:
a. Create a DLQ
b. DLQ Configuration
c. DLQ Redrive Policy
d. Dead letter Queue
7. Assigning the DLQ to QUEUE
This is a critical setting and will be executed when the conventional queue fails to convey the message. Messages will be shifted automatically from the standard queue to the DLQ following a specified number of attempts, as specified in step 3. The DLQ must be used for logging, notification, or retry attempts.
8. Spring boot configuration for Queue and DLQ
a. POC Project Creation: Create a Spring Boot Project using spring initializr. For complete details of dependency please refer to pom.xml.
b. SQS Configuration
The below configuration is used for setup SQS client.
Spring boot application.properties
cloud.aws.region.static=us-west-2
cloud.aws.region.auto=false
aws.queue.base-end-point=https://sqs.us-east-2.amazonaws.com/<<ACCOUNT_ID>>/
package com.example.sqs.sqspoc.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory;
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
@Configuration
public class SqsConfig {
@Bean
public SqsAsyncClient sqsAsyncClient() {
return SqsAsyncClient.builder().region(Region.US_WEST_2).build();
}
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory() {
return SqsMessageListenerContainerFactory
.builder()
.configure(options -> options
.acknowledgementMode(AcknowledgementMode.ON_SUCCESS)
)
.sqsAsyncClient(sqsAsyncClient())
.build();
}
}
c. Event Producer
This class will be used for publishing the event to the SQS queue. It takes the SQS queue name and the message as parameters and publishes the event to SQS.
configure the SQS end point in application.properties
package com.example.sqs.sqspoc.event;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.example.sqs.sqspoc.utils.JsonUtil;
import io.awspring.cloud.sqs.operations.SqsTemplate;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Component
@RequiredArgsConstructor
@Slf4j
public class SQSEventPublisher {
@Value("${aws.queue.base-end-point}")
private String baseSQSEndPoint;
private final SqsTemplate sqsTemplate;
public void publish(String queueName, Object msg) {
log.info("EventPublisher::Publisher: Start ");
String message = JsonUtil.convertToJson(msg);
sqsTemplate.send(buildSQSEndPoint(queueName),
MessageBuilder.withPayload(message).build());
log.info("EventPublisher::Publisher: End ");
}
private String buildSQSEndPoint(String queueName) {
log.debug("EventPublisher::buildSQSEndPoint: Start ");
StringBuilder sb= new StringBuilder();
sb.append(baseSQSEndPoint);
sb.append(queueName);
log.debug("EventPublisher::buildSQSEndPoint: End ");
return sb.toString();
}
}
e. Create User API and raise an Event
This is the user registration API implementation, where the controller accepts the User Profile request, calls the service class to save it, and on success raises the event for further processing.
package com.example.sqs.sqspoc.user;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import com.example.sqs.sqspoc.event.SQSEventPublisher;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@RequiredArgsConstructor
@RestController
@Slf4j
public class UserProfileController {
private final SQSEventPublisher eventPublisher;
private final UserProfileService userProfileService;
@PostMapping("/users")
public String saveUserProfile(@RequestBody UserProfileRequest userProfileRequest) {
log.info("UserProfileController::saveUserProfile:Start");
userProfileService.saveUser(userProfileRequest);
eventPublisher.publish("USER-PROFILE-QUEUE", userProfileRequest);
log.info("UserProfileController::saveUserProfile:end");
return "Done!";
}
}
f. Event Listener
The event generated by the User profile registration API will be handled within this class. The event will be retrieved from the SQS every 5 seconds and delegated to the Event handler class for further processing. If the event cannot be processed, a retry will be initiated; after a specified number of attempts, the event will be transferred to the DLQ and the Event DLQ Listener will assign the task to the DLQ event handler.
package com.example.sqs.sqspoc.event;
import org.springframework.stereotype.Component;
import com.example.sqs.sqspoc.user.UserProfileDLQEventHandler;
import com.example.sqs.sqspoc.user.UserProfileEventHandler;
import io.awspring.cloud.sqs.annotation.SqsListener;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
@RequiredArgsConstructor
public class UserProfileEventListener {
private final UserProfileEventHandler userProfileEventHandler;
private final UserProfileDLQEventHandler userProfileDLQEventHandler;
@SqsListener(value = "USER-PROFILE-QUEUE", pollTimeoutSeconds = "5" )
public void queueListener(String message) {
log.info("UserProfileEventListener::queueListener:Start");
userProfileEventHandler.handle(message);
log.info("UserProfileEventListener::queueListener:END");
}
@SqsListener(value = "USER-PROFILE-QUEUE-DLQ", pollTimeoutSeconds = "5" )
public void dlqListener(String message) {
log.info("UserProfileEventListener::dlqListener:Start");
userProfileDLQEventHandler.handle(message);
log.info("UserProfileEventListener::dlqListener:END");
}
}
g. Execution log
The execution log for the process of going from the User Profile registration to the event listers is included below.
- Normal Flow
2. Retry and DLQ Flow
Please modify the code as below to test the same:
In UserProfileEventListener.java, modify the code and run the application to test the retry and DLQ flow.
@SqsListener(value = "USER-PROFILE-QUEUE", pollTimeoutSeconds = "5" )
public void queueListener(String message) {
log.info("UserProfileEventListener::queueListener:Start");
if(true) {
throw new RuntimeException("Testing RETRY and DLQ.");
}
userProfileEventHandler.handle(message);
log.info("UserProfileEventListener::queueListener:END");
}
Summary
We have learned the step-by-step implementation of SQS and Spring Boot in order to construct an event-driven architecture in this blog. When a domain-driven architecture is adhered to and two or more services are invoked, the event driver architecture is a critical component of the Microservice Architecture. In addition to facilitating loose coupling, this reduces synchronous integration among the microservice. Additional use cases may be added to this example. The key benefit of using SQS in this case is that it is a managed service that offers simple configuration and scalability. No operational assistance is necessary since it is a serverless service.