[AWS] AWS SNS와 SQS를 사용한 이벤트 처리 구현하기 (w.Spring Boot)
'AWS로 구현하는 MAS와 컨테이너 오캐스트레이션' 강의를
블로그를 통해 구매하시는 분들에게만 10%할인 중입니다.
이미지를 클릭하고, 아래 쿠폰번호를 입력해 주세요!
16861-259843d6c2d7
Spring 환경 (gradle)
plugins {
id("org.springframework.boot") version "2.7.2"
id("io.spring.dependency-management") version "1.0.12.RELEASE"
kotlin("jvm") version "1.6.21"
kotlin("plugin.spring") version "1.6.21"
kotlin("plugin.jpa") version "1.6.21"
kotlin("kapt") version "1.6.21"
}
...
extra["springCloudVersion"] = "2021.0.3"
...
dependencies {
implementation(platform("software.amazon.awssdk:bom:2.20.54"))
implementation("software.amazon.awssdk:sns") // SNS 모듈 추가
implementation("software.amazon.awssdk:sqs") // SQS 모듈 추가
}
AWS SNS와 SQS
SNS 와 SQS 는 AWS 에서 제공하는 메시지 전달 서비스로, 각각의 특징을 활용해 효율적이고 안정적인 시스템 구축이 가능하다.
AWS 가 제공해준다는 이유만으로 믿을 수 있는 내구성과 확장성만으로도 사용할 이유가 충분하고 본다.
AWS SNS
- Pub/Sub 모델 - publisher-subscriber 패턴을 사용해 여러 구독자에게 동시에 메시지를 전달할 수 있다. 이 특징을 통해 다양한 애플리케이션 및 서비스와 결합하여 복잡한 시스템 구축 가능.
- 여러 프로토콜 지원 - 다양한 프로토콜을 지원한다. 이메일, SMS, AWS Lambda, AWS SQS 등에 메시지를 전송할 수 있다.
- 확장성 - 자동으로 메시지 전달처리를 확장해주기 때문에 시스템이 성장할 때 별도의 구성이나 관리가 필요하지 않다.
- 높은 내구성 - 여러 가용 영역에 메시지가 저장되어 서비스 중단이나 데이터 손실이 거의 없다.
AWS SQS
- 분산 시스템 간 메시지 전달 - 분산되어있는 서비스 간 메시지를 안전하게 전달하고, 실패한 메시지는 자동으로 재시도한다. 이로 인해 분산 서비스 간의 통신을 안정적으로 유지할 수 있다.
- 비동기 처리 - 메시지 큐를 이요해 작업을 비동기적으로 처리할 수 있어 시스템의 응답시간을 개선하고, 다양한 작업을 동시에 처리할 수 있다.
- 확장성 - 자동으로 큐를 확장하고 관리해 주기 때문에 시스템 성장 시 별도의 구성이나 관리가 필요하지 않다.
- 높은 내구성 - SNS 와 마찬가지로 여러 가용 영역에 메시지가 저장되어 서비스 중단이나 데이터 손실이 거의 없다.
Spring Boot를 사용해 SNS와 SQS를 통한 이벤트 처리 구현
1. SNS 주제 생성
AWS SNS 콘솔 -> 좌측 탭의 '주제' -> 우측 상단의 '주제 생성' 버튼 클릭하여 해당 화면으로 이동
- 유형
- 표준 주제
- 고성능, 확장이 가능하며 비용 효율적인 메세징 솔루션을 제공한다. 메시지의 순서는 보장되지 않아 나중에 보낸 메시지가 먼저 도착할 수 있고 오류 등의 이유로 인해 메시지가 여러번 보내질 경우 중복된 메시지가 전달될 수 있다.
- 알림 및 경고, 비동기 작업, 이벤트 기반 아키텍쳐 등에 사용
- FIFO 주제
- 순서를 보장하는 메세징 솔루션을 제공하고 중복 메시지에 대한 처리가 가능해 오류로 인해 메시지가 여러번 보내져도 메시지는 한번만 전달된다. 하지만 FIFO 주제는 처리량이 제한되어있고(초당 300개) 이러한 제한 및 기능으로 성능이 다소 떨어질 수 있다.
- 순서 보장 작업, 트랜잭션 처리, 워크플로우 관리 등에 사용
- 표준 주제
- 이름 - 주제의 이름
- 표시 이름 - 이메일, 이메일-JSON 프로토콜 사용할 때 메시지의 발신자 이름
- 콘텐츠 기반 메시지 중복 제거(FIFO 주제 옵션) - 메시지의 내용을 기반으로 중복 메시지를 자동으로 제거
메시지의 내용이 동일한 메시지가 이미 전송 되었다면 해당 메시지를 중복으로 간주하고 전달하지 않음. - 암호화 - 데이터를 암호화하여 기밀성을 보장한다. 암호화 작업이 필요함에 따라 미세하게 성능 저하가 될 수 있다.
내용을 입력하고 하단의 '주제 생성' 버튼을 통해 주제를 생성할 수 있다.
2. SQS 대기열 생성
AWS SQS 콘솔 -> 우측의 '대기열 생성' 버튼 클릭하여 해당 화면으로 이동
- 유형
- 표준 대기열
- 높은 처리량을 위해 설계되었으며 메시지 전달 순서를 보장하지 않아 메시지가 전송된 순서와 실제 처리되는 순서가 다를 수 있고 중복 메시지가 발생할 수 있다.
- FIFO 대기열
- 전달 순서를 보장하여 전송된 순서대로 처리되고 중복 메시지가 발생하지 않는다. 하지만 표준 대기열에 비해 처리량이 낮지만 대부분의 사용 사례에서는 충분한 처리량을 제공한다.
- 콘텐츠 기반 중복 제거(FIFO 주제 옵션) - 메시지의 내용을 기반으로 중복 메시지를 자동으로 제거
메시지의 내용이 동일한 메시지가 이미 전송 되었다면 해당 메시지를 중복으로 간주하고 전달하지 않음. - 높은 처리량의 FIFO 대기열 - API당 초당 트랜잭션 수를 증가
- 표준 대기열
- 이름 - 대기열의 이름
- 표시 제한 시간 - 메시지를 처리하기 위해 큐에서 가져왔을 때 다른 컨슈머가 해당 메시지를 볼 수 없게 하는 시간. 이 기능을 통해 중복 처리를 방지할 수 있다.
워커가 메시지를 가져와 처리를 시작하면 해당 메시지는 설정된 시간만큼 비활성화 상태가 되어 다른 컨슈머가 볼 수 없고 처리가 완료되면 워커는 메시지를 삭제하여 큐에서 제거해야한다. - 메시지 보존 기간 - 메시지가 큐에 저장되는 최대 시간으로 메시지가 설정 기간동안 처리되지 않으면 큐에서 삭제된다.
- 전송 지연 - 새로운 메시지가 큐에 추가되고 나서 컨슈머에게 전달되기 전까지 지연시간으로, 큐에서 처리되기 전에 일정 시간 동안 대기하도록 한다.
- 최대 메시지 크기 - 큐에 저장할 수 있는 각 메시지의 최대 크기
- 메시지 수신 대기 시간 - 컨슈머가 메시지를 가져올 때 큐에 메시지가 없는 경우 컨슈머가 대기할 시간을 설정하는 기능.
- Short Polling(수신 대기 0초) - 컨슈머는 큐에서 즉시 메시지를 가져온다. 메시지가 없으면 컨슈머는 메시지를 가져오는 못하고 빈 결과를 받게 된다. 이 방식은 컨슈머가 빠르게 메시지를 처리할 수 있는 경우에 적합하지만 큐에 메시지가 없을 때 불필요한 API 호출과 비용이 발생할 수 있다.
- Long Polling(수신 대기 1초 이상) - 컨슈머는 설정된 시간 동안 큐에 메시지가 도착할 때까지 대기한다. 큐에 메시지가 없을 때 컨슈머가 대기하도록 함으로써 API 호출 횟수와 비용을 줄여주고 메시지가 큐에 도착하는 즉시 컨슈머에게 전달되므로 메시지 처리 지연 시간도 최소화할 수 있다.
- Amazon SQS 키(SSE-SQS) - SQS가 키를 관리하고 각 메시지에 대해 고유한 키를 생성한다. 이 키는 메시지가 전송되거나 저장되는 동안 사용되며, 메시지 수명이 끝날 때 제거된다. 이 방식은 키 관리에 대한 부담을 줄이고, 암호화를 사용하여 메시지의 보안 강화에 도움이 된다.
- AWS Key Management Service 키(SSE-KMS) - 사용자가 직접 키를 생성, 삭제 등 관리를 하며 KMS의 감사 기능을 통해 키 사용에 대한 로깅과 모니터링을 할 수 있다.
이를 활성화 하면 메시지의 암호화와 복호화 과정이 추가되므로 메시지 처리에 약간의 지연이 발생할 수는 있으나 전체 시스템에 큰 영향을 미치지는 않는다.
리드라이브 - 실패한 메시지를 처리하기 위해 DLQ와 연계하여 사용되는 기능. 리드라이브를 사용하면 처리에 실패한 메시지를 지정된 횟수만큼 재시도한 후 DLQ로 이동시킨다. 이를 통해 메시지 처리 실패 문제를 효과적으로 처리하고 주 대기열의 처리를 원활하게 할 수 있다.
그러나 이를 사용하기 위해서는 추가적인 큐가 필요하여 추가적인 관리가 필요할 수 있다.
배달 못한 편지 대기열(DLQ) - 메시지 처리에 실패한 경우 해당 메시지를 보관하고 처리하는데 사용되는 별도의 대기열의 활성화 여부
메시지가 여러번 처리에 실패하거나 일정 시간 동안 처리되지 않을 경우, 해당 메시지는 DLQ로 이동하여 추적, 분석, 재 처리할 수 있다.
주 대기열에서의 실패한 메시지를 별도로 관리할 수 있으므로 메시지 처리가 더 원활해지며 실패한 메시지 반복 처리가 되지 않으므로 전체적인 처리 속도가 향살될 수 있다.
추가적인 대기열이 생성되므로 대기열 및 관련 리소스에 대한 추가 비용이 발생할 수 있지만 메시지를 효과적으로 관리하고 처리할 수 있어 전체적인 시스템 비용이 절약될 수 있다.
요구사항에 맞는 옵션을 설정 후 '대기열 생성' 버튼을 통해 생성할 수 있다.
3. SNS의 주제 구독
우측의 'Amazone SNS 주제 구독' 클릭
생성한 SNS ARN 선택 후 '저장'
4. Spring Boot 에서 생성한 SNS 로 메시지 발행
- SnsClient 빈 등록
@Bean
fun snsClient(): SnsClient {
val awsCredentials: AwsBasicCredentials = AwsBasicCredentials.create(accessKey, secretKey)
return SnsClient.builder()
.credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
.region(Region.of(region))
.build()
}
accessKey, secretKey, region 은 자신의 AWS 설정에 맞게 설정한 후 SnsClient 빈을 등록한다.
- SNS 메시지를 발행할 Publisher 클래스 생성
class EventPublisher(
private val snsClient: SnsClient,
private val topicArn: String
) : EventPublisher {
fun publishEvent(message: String) {
val request = PublishRequest.builder()
.topicArn(topicArn)
.messageGroupId("groupId")
.message(message)
.build()
val response = snsClient.publish(request)
println("MessageId: ${response.messageId()}")
}
}
- EventPublisher의 빈 등록
@Value("\${cloud.aws.sns.arn}")
private lateinit var snsTopicArn: String
@Bean
fun eventPublisher(snsClient: SnsClient): EventPublisherImpl {
return EventPublisher(snsClient, snsTopicArn)
}
snsTopicArn 은 생성된 SNS 주제의 ARN 을 입력해준다.
- 메시지 발행
@Autowired
lateinit var eventPublisher: EventPublisher
fun publishEvent() {
eventPublisher.publishEvent("testMessage!")
}
5. Spring Boot 에서 SNS 에서 SQS 로 발행된 메시지 수신
@Configuration
class EventSubscriber(
@Value("\${cloud.aws.sqs.queueName}") private var queueName: String,
private val sqsClient: SqsClient,
) {
val queueUrl = sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).queueUrl()
@Scheduled(fixedRateString = "5000")
fun receiveMessages() {
val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(5)
.build()
val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
for (message in messages) {
processMessage(message.body())
deleteMessage(message, queueUrl)
}
}
private fun processMessage(messageBody: String) {
println(messageBody)
// 토픽에 맞는 동작
}
private fun deleteMessage(message: Message, queueUrl: String) {
val deleteMessageRequest: DeleteMessageRequest = DeleteMessageRequest.builder()
.queueUrl(queueUrl)
.receiptHandle(message.receiptHandle())
.build()
sqsClient.deleteMessage(deleteMessageRequest)
}
}