본문 바로가기

AWS

[aws] aws iot core도입하여 spring/kotlin에서 mqtt 구독 및 발행

'AWS로 구현하는 MAS와 컨테이너 오캐스트레이션' 강의를

블로그를 통해 구매하시는 분들에게만 10%할인 중입니다.

이미지를 클릭하고, 아래 쿠폰번호를 입력해 주세요!

16861-259843d6c2d7


 

aws iot core + spring/kotlin

aws iot 좌측 탭 관리 -> 사물 화면에서 오른쪽 생성으로 환경에 맞는 사물 생성 (인증서는 aws 에서 권장하는 원클릭 인증서 추가)

 

퍼블릭 및 프라이빗 키는 현재 페이지 이탈 시 다운로드 불가능 하므로 로컬 및 서버에 저장하고 활성화 후 정책 연결 진행

 

정책에서는 구독 및 발행할 토픽, 연결 허용할 클라이이언트 id 설정할 수 있음

(허용되지 않은 토픽, 클라이언트id는 연결 안 됨)

 

 

    val rootCaPath =
        "/Users/mac/Documents/aws/connect_device_package/root-CA.crt" // 자격 증명을 가져 오는 데 사용되는 루트 인증서 경로 - http 통신 시

    @Value("\${cloud.aws.iot.client-id}")
    lateinit var clientId: String // 연결할 때 사용할 클라이언트 ID (선택 사항)

    @Value("\${cloud.aws.iot.cert-path}")
    lateinit var certPath: String// 자격 증명을 가져 오는 데 사용되는 IoT 사물 인증서의 경로

    @Value("\${cloud.aws.iot.private-key-path}")
    lateinit var keyPath: String // 자격 증명을 가져 오는 데 사용되는 IoT 사물 개인 키에 대한 경로

    @Value("\${cloud.aws.iot.endpoint}")
    lateinit var endpoint: String // AWS IoT 서비스 엔드 포인트 호스트 이름

    @Value("\${cloud.aws.iot.port}")
    var port: Int? = null// endpoint 연결할 포트

    val topic = "hit-up"
    val message = "Hello World!"
    val messagesToPublish = 1 // 게시 할 메시지 수 (선택 사항)
    
    @Bean
    fun awsIotMqtt() {
        val callbacks: MqttClientConnectionEvents = object : MqttClientConnectionEvents {
            override fun onConnectionInterrupted(errorCode: Int) {
                if (errorCode != 0) {
                    println("Connection interrupted: " + errorCode + ": " + CRT.awsErrorString(errorCode))
                }
            }

            override fun onConnectionResumed(sessionPresent: Boolean) {
                println("Connection resumed: " + if (sessionPresent) "existing session" else "clean session")
            }
        }

        val eventLoopGroup = EventLoopGroup(1)
        val resolver = HostResolver(eventLoopGroup)
        val clientBootstrap = ClientBootstrap(eventLoopGroup, resolver)
        val builder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(certPath, keyPath)

//        if (rootCaPath != null) {
//            builder.withCertificateAuthorityFromPath(null, rootCaPath)
//        }

        builder.withBootstrap(clientBootstrap)
            .withConnectionEventCallbacks(callbacks)
            .withClientId(clientId)
            .withEndpoint(endpoint)
            .withPort(port!!.toShort())
            .withCleanSession(true)
            .withProtocolOperationTimeoutMs(60000)

        try {
            val connection = builder.build()
            val connected = connection.connect()
            try {
                val sessionPresent = connected.get()
                println("Connected to ${if (sessionPresent) "new" else "existing"} session!");
            } catch (ex: Exception) {
                throw RuntimeException("Exception occurred during connect", ex)
            }

            val countDownLatch = CountDownLatch(messagesToPublish)

            val subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE) { message ->
                val payload = String(message.payload, StandardCharsets.UTF_8)
                println("MESSAGE: $payload")
                countDownLatch.countDown()
            }

            subscribed.get()

            var count = 0
            while (count++ < messagesToPublish) {
                val mqttMessage = MqttMessage(topic, message.toByteArray(), QualityOfService.AT_LEAST_ONCE, false)
                val published = connection.publish(mqttMessage)
                published.get()
                Thread.sleep(1000)
            }

            countDownLatch.await()

//            val disconnected = connection.disconnect()
//            disconnected.get()
        } catch (ex: ExecutionException) {
            println("Exception encountered: $ex")
        } catch (ex: CrtRuntimeException) {
            println("Exception encountered: $ex")
        } catch (ex: InterruptedException) {
            println("Exception encountered: $ex")
        }
        CrtResource.waitForNoResources()

        println("Complete!")
    }

https://github.com/aws/aws-iot-device-sdk-java-v2

해당 aws 의 예제를 보고 위의 사물 정보에 맞게 설정

  • port - ssl인증서 적용 시 8443포트 / 그 외 443포트 사용
  • endpoint - aws 계정에 연결할 디바이스 엔드포인트 (aws iot 콘솔의 좌츨 하단 설정 탭에서 확인 가능)
  • clientId - 연결에 사용할 clientId (권한 허용 필요)
  • topic - 구독 및 발행할 토픽(권한 허용 필요)
  • keyPath/certPath - 사물 생성 시 발급받은 인증서 및 privateKey 파일의 경로

 

  • withCleanSession - 이전 세션 연결을 유지(true) 또는 삭제(false)
  • withProtocolOperationTimeoutMs - 발행 및 구독 취소(qos > 0) 시 응답 제한시간 ms
  • QualityOfService(qos) 레벨 0 - 전송 결과에 상관없이 한 번만 보낸다. 신뢰 보장하지 않음
  • QualityOfService(qos) 레벨 1 - 최소 한 번 보내고 PUBACK을 받지 않으면 다시 한 번 더 보낸다. 결과의 신뢰도가 높지만 PUBACK을 Broker에서 유실 시 중복된 데이터를 받는 경우가 생길 수 있다. 
  • QualityOfService(qos) 레벨 2 - PUBACK 과정을 3way handshaking으로 1번 전달을 보장한다. 하지만 과정이 길어지므로 전송 속도는 저하된다.

해당 코드로 연결 성공시 출력되는 로그
aws 콘솔에서의 구독 및 발행 테스트

정상 연결 시

aws 콘솔에서 테스트 발행을 시도하면 구독하고있던 클라이언트 서버에서 수신을 하게 된다.

 

 

참고 자료