본문 바로가기

AWS

[AWS] AWS SNS와 SQS를 사용한 이벤트 처리 구현하기 (w.Spring Boot)

반응형

 

 

Spring 환경 (gradle)

plugins {
    id("org.springframework.boot") version "2.7.2"
    id("io.spring.dependency-management") version "1.0.12.RELEASE"
    kotlin("jvm") version "1.6.21"
    kotlin("plugin.spring") version "1.6.21"
    kotlin("plugin.jpa") version "1.6.21"
    kotlin("kapt") version "1.6.21"
}
...
extra["springCloudVersion"] = "2021.0.3"
...
dependencies {
    implementation(platform("software.amazon.awssdk:bom:2.20.54"))
    implementation("software.amazon.awssdk:sns") // SNS 모듈 추가
    implementation("software.amazon.awssdk:sqs") // SQS 모듈 추가
}

 


 

AWS SNSSQS

 

SNS 와 SQS 는 AWS 에서 제공하는 메시지 전달 서비스로, 각각의 특징을 활용해 효율적이고 안정적인 시스템 구축이 가능하다.

AWS 가 제공해준다는 이유만으로 믿을 수 있는 내구성과 확장성만으로도 사용할 이유가 충분하고 본다.

 

AWS SNS

  • Pub/Sub 모델 - publisher-subscriber 패턴을 사용해 여러 구독자에게 동시에 메시지를 전달할 수 있다. 이 특징을 통해 다양한 애플리케이션 및 서비스와 결합하여 복잡한 시스템 구축 가능.
  • 여러 프로토콜 지원 - 다양한 프로토콜을 지원한다. 이메일, SMS, AWS Lambda, AWS SQS 등에 메시지를 전송할 수 있다.
  • 확장성 - 자동으로 메시지 전달처리를 확장해주기 때문에 시스템이 성장할 때 별도의 구성이나 관리가 필요하지 않다.
  • 높은 내구성 - 여러 가용 영역에 메시지가 저장되어 서비스 중단이나 데이터 손실이 거의 없다.

 


 

AWS SQS

  • 분산 시스템 간 메시지 전달 - 분산되어있는 서비스 간 메시지를 안전하게 전달하고, 실패한 메시지는 자동으로 재시도한다. 이로 인해 분산 서비스 간의 통신을 안정적으로 유지할 수 있다.
  • 비동기 처리 - 메시지 큐를 이요해 작업을 비동기적으로 처리할 수 있어 시스템의 응답시간을 개선하고, 다양한 작업을 동시에 처리할 수 있다.
  • 확장성 - 자동으로 큐를 확장하고 관리해 주기 때문에 시스템 성장 시 별도의 구성이나 관리가 필요하지 않다.
  • 높은 내구성 - SNS 와 마찬가지로 여러 가용 영역에 메시지가 저장되어 서비스 중단이나 데이터 손실이 거의 없다.

 

 


 

Spring Boot를 사용해 SNS와 SQS를 통한 이벤트 처리 구현

 

1. SNS 주제 생성

 

 

AWS SNS 콘솔 -> 좌측 탭의 '주제' -> 우측 상단의 '주제 생성' 버튼 클릭하여 해당 화면으로 이동

  • 유형
    • 표준 주제
      • 고성능, 확장이 가능하며 비용 효율적인 메세징 솔루션을 제공한다. 메시지의 순서는 보장되지 않아 나중에 보낸 메시지가 먼저 도착할 수 있고 오류 등의 이유로 인해 메시지가 여러번 보내질 경우 중복된 메시지가 전달될 수 있다.
      • 알림 및 경고, 비동기 작업, 이벤트 기반 아키텍쳐 등에 사용
    • FIFO 주제
      • 순서를 보장하는 메세징 솔루션을 제공하고 중복 메시지에 대한 처리가 가능해 오류로 인해 메시지가 여러번 보내져도 메시지는 한번만 전달된다. 하지만 FIFO 주제는  처리량이 제한되어있고(초당 300개) 이러한 제한 및 기능으로 성능이 다소 떨어질 수 있다.
      • 순서 보장 작업, 트랜잭션 처리, 워크플로우 관리 등에 사용
  • 이름 - 주제의 이름
  • 표시 이름 - 이메일, 이메일-JSON 프로토콜 사용할 때 메시지의 발신자 이름
  • 콘텐츠 기반 메시지 중복 제거(FIFO 주제 옵션) - 메시지의 내용을 기반으로 중복 메시지를 자동으로 제거
    메시지의 내용이 동일한 메시지가 이미 전송 되었다면 해당 메시지를 중복으로 간주하고 전달하지 않음.
  • 암호화 - 데이터를 암호화하여 기밀성을 보장한다. 암호화 작업이 필요함에 따라 미세하게 성능 저하가 될 수 있다.

내용을 입력하고 하단의 '주제 생성' 버튼을 통해 주제를 생성할 수 있다.

 


 

2. SQS 대기열 생성

 

 

AWS SQS 콘솔 -> 우측의 '대기열 생성' 버튼 클릭하여 해당 화면으로 이동

  • 유형
    • 표준 대기열
      • 높은 처리량을 위해 설계되었으며 메시지 전달 순서를 보장하지 않아 메시지가 전송된 순서와 실제 처리되는 순서가 다를 수 있고 중복 메시지가 발생할 수 있다.
    • FIFO 대기열
      • 전달 순서를 보장하여 전송된 순서대로 처리되고 중복 메시지가 발생하지 않는다. 하지만 표준 대기열에 비해 처리량이 낮지만 대부분의 사용 사례에서는 충분한 처리량을 제공한다.
      • 콘텐츠 기반 중복 제거(FIFO 주제 옵션) - 메시지의 내용을 기반으로 중복 메시지를 자동으로 제거
        메시지의 내용이 동일한 메시지가 이미 전송 되었다면 해당 메시지를 중복으로 간주하고 전달하지 않음.
      • 높은 처리량의 FIFO 대기열 - API당 초당 트랜잭션 수를 증가
  • 이름 - 대기열의 이름

 

 

  • 표시 제한 시간 - 메시지를 처리하기 위해 큐에서 가져왔을 때 다른 컨슈머가 해당 메시지를 볼 수 없게 하는 시간. 이 기능을 통해 중복 처리를 방지할 수 있다.
    워커가 메시지를 가져와 처리를 시작하면 해당 메시지는 설정된 시간만큼 비활성화 상태가 되어 다른 컨슈머가 볼 수 없고 처리가 완료되면 워커는 메시지를 삭제하여 큐에서 제거해야한다.
  • 메시지 보존 기간 - 메시지가 큐에 저장되는 최대 시간으로 메시지가 설정 기간동안 처리되지 않으면 큐에서 삭제된다.
  • 전송 지연 - 새로운 메시지가 큐에 추가되고 나서 컨슈머에게 전달되기 전까지 지연시간으로, 큐에서 처리되기 전에 일정 시간 동안 대기하도록 한다.
  • 최대 메시지 크기 - 큐에 저장할 수 있는 각 메시지의 최대 크기
  • 메시지 수신 대기 시간 - 컨슈머가 메시지를 가져올 때 큐에 메시지가 없는 경우 컨슈머가 대기할 시간을 설정하는 기능.
    • Short Polling(수신 대기 0초) - 컨슈머는 큐에서 즉시 메시지를 가져온다. 메시지가 없으면 컨슈머는 메시지를 가져오는 못하고 빈 결과를 받게 된다. 이 방식은 컨슈머가 빠르게 메시지를 처리할 수 있는 경우에 적합하지만 큐에 메시지가 없을 때 불필요한 API 호출과 비용이 발생할 수 있다.
    • Long Polling(수신 대기 1초 이상) - 컨슈머는 설정된 시간 동안 큐에 메시지가 도착할 때까지 대기한다. 큐에 메시지가 없을 때 컨슈머가 대기하도록 함으로써 API 호출 횟수와 비용을 줄여주고 메시지가 큐에 도착하는 즉시 컨슈머에게 전달되므로 메시지 처리 지연 시간도 최소화할 수 있다.

 

 

  • Amazon SQS 키(SSE-SQS) - SQS가 키를 관리하고 각 메시지에 대해 고유한 키를 생성한다. 이 키는 메시지가 전송되거나 저장되는 동안 사용되며, 메시지 수명이 끝날 때 제거된다. 이 방식은 키 관리에 대한 부담을 줄이고, 암호화를 사용하여 메시지의 보안 강화에 도움이 된다.
  • AWS Key Management Service 키(SSE-KMS) - 사용자가 직접 키를 생성, 삭제 등 관리를 하며 KMS의 감사 기능을 통해 키 사용에 대한 로깅과 모니터링을 할 수 있다.

이를 활성화 하면 메시지의 암호화와 복호화 과정이 추가되므로 메시지 처리에 약간의 지연이 발생할 수는 있으나 전체 시스템에 큰 영향을 미치지는 않는다.

 

 

리드라이브 - 실패한 메시지를 처리하기 위해 DLQ와 연계하여 사용되는 기능. 리드라이브를 사용하면 처리에 실패한 메시지를 지정된 횟수만큼 재시도한 후 DLQ로 이동시킨다. 이를 통해 메시지 처리 실패 문제를 효과적으로 처리하고 주 대기열의 처리를 원활하게 할 수 있다.

그러나 이를 사용하기 위해서는 추가적인 큐가 필요하여 추가적인 관리가 필요할 수 있다.

 

 

배달 못한 편지 대기열(DLQ) - 메시지 처리에 실패한 경우 해당 메시지를 보관하고 처리하는데 사용되는 별도의 대기열의 활성화 여부
메시지가 여러번 처리에 실패하거나 일정 시간 동안 처리되지 않을 경우, 해당 메시지는 DLQ로 이동하여 추적, 분석, 재 처리할 수 있다.

주 대기열에서의 실패한 메시지를 별도로 관리할 수 있으므로 메시지 처리가 더 원활해지며 실패한 메시지 반복 처리가 되지 않으므로 전체적인 처리 속도가 향살될 수 있다.

추가적인 대기열이 생성되므로 대기열 및 관련 리소스에 대한 추가 비용이 발생할 수 있지만 메시지를 효과적으로 관리하고 처리할 수 있어 전체적인 시스템 비용이 절약될 수 있다.

 

요구사항에 맞는 옵션을 설정 후 '대기열 생성' 버튼을 통해 생성할 수 있다.

 


 

3.  SNS의 주제 구독

 

 

우측의 'Amazone SNS 주제 구독' 클릭

 

 

생성한 SNS ARN 선택 후 '저장'

 


 

4.  Spring Boot 에서 생성한 SNS 로 메시지 발행

 

- SnsClient 빈 등록

@Bean
fun snsClient(): SnsClient {
    val awsCredentials: AwsBasicCredentials = AwsBasicCredentials.create(accessKey, secretKey)
    return SnsClient.builder()
        .credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
        .region(Region.of(region))
        .build()
}

accessKey, secretKey, region 은 자신의 AWS 설정에 맞게 설정한 후 SnsClient 빈을 등록한다.

 

- SNS 메시지를 발행할 Publisher 클래스 생성

class EventPublisher(
    private val snsClient: SnsClient,
    private val topicArn: String
) : EventPublisher {

    fun publishEvent(message: String) {
        val request = PublishRequest.builder()
            .topicArn(topicArn)
            .messageGroupId("groupId")
            .message(message)
            .build()
        val response = snsClient.publish(request)
        println("MessageId: ${response.messageId()}")
    }
}

 

- EventPublisher의 빈 등록

@Value("\${cloud.aws.sns.arn}")
private lateinit var snsTopicArn: String

@Bean
fun eventPublisher(snsClient: SnsClient): EventPublisherImpl {
    return EventPublisher(snsClient, snsTopicArn)
}

 

snsTopicArn 은 생성된 SNS 주제의 ARN 을 입력해준다.

 

- 메시지 발행

@Autowired
lateinit var eventPublisher: EventPublisher

fun publishEvent() {
    eventPublisher.publishEvent("testMessage!")
}

 


 

5.  Spring Boot 에서 SNS 에서 SQS 로 발행된 메시지 수신

 

@Configuration
class EventSubscriber(
    @Value("\${cloud.aws.sqs.queueName}") private var queueName: String,
    private val sqsClient: SqsClient,
) {

    val queueUrl = sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).queueUrl()
    
    @Scheduled(fixedRateString = "5000")
    fun receiveMessages() {
        val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder()
            .queueUrl(queueUrl)
            .maxNumberOfMessages(10)
            .waitTimeSeconds(5)
            .build()
        val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages()
        for (message in messages) {
            processMessage(message.body())
            deleteMessage(message, queueUrl)
        }
    }

    private fun processMessage(messageBody: String) {
        println(messageBody)
        // 토픽에 맞는 동작
    }

    private fun deleteMessage(message: Message, queueUrl: String) {
        val deleteMessageRequest: DeleteMessageRequest = DeleteMessageRequest.builder()
            .queueUrl(queueUrl)
            .receiptHandle(message.receiptHandle())
            .build()
        sqsClient.deleteMessage(deleteMessageRequest)
    }
}

 

반응형