SQS Listener 구축

  1. 의존성 추가
implementation 'org.springframework.cloud:spring-cloud-starter-aws-messaging'

   2. SQS 주소 Properties 생성

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Getter;

@Component
@Getter
@ConfigurationProperties("sqs")
public class AwsSqsNameProperties {
     private String sqsExample;
}

 

  • application.yml
sqs:
 sqsExample: [SQS 주소]
 
cloud:
 aws:
  region:
   static: 리전 정보(ex: ap-northeast-2)
  stack:
   auto: false
  credentials:
   access-key: IAM 에서 발급받은 엑세스키
   secret-key: IAM에서 발급받은 시크릿키

SQS 주소는 빨간 부분을 넣어주시면 됩니다.

  3. SQSConfig 작성

@EnableSqs
@Configuration
@EnableConfigurationProperties(value = {AwsSqsNameProperties.class})
@RequiredArgsConstructor
public class SQSConfig {


    public static final ObjectMapper objectMapper = Jackson2ObjectMapperBuilder.json()
        .simpleDateFormat("yyyy-MM-dd HH:mm:ss")
        .timeZone("Asia/Seoul")
        .build();

    public static final MappingJackson2MessageConverter JSON_MESSAGE_CONVERTER;

    static {
        JSON_MESSAGE_CONVERTER = new MappingJackson2MessageConverter();
        JSON_MESSAGE_CONVERTER.setSerializedPayloadClass(String.class);
        JSON_MESSAGE_CONVERTER.setStrictContentTypeMatch(false);
        JSON_MESSAGE_CONVERTER.setObjectMapper(objectMapper);
    }

    // Resolver 연결 특정 객체로 변경할려고 할때 작성
    @Bean
    public QueueMessageHandlerFactory queueMessageHandlerFactory() {
        QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
        factory.setArgumentResolvers(unmodifiableList(asList(
            new MessageResolver(
                new NotificationRequestConverter(JSON_MESSAGE_CONVERTER))
        )));
        return factory;
    }

    // SQS Listener 빈등록
    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
        AmazonSQSAsync amazonSQSAsync, AsyncTaskExecutor taskExecutor) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setMaxNumberOfMessages(10);
        factory.setAutoStartup(true);
        factory.setBackOffTime(1000L);
        factory.setTaskExecutor(taskExecutor);
        return factory;
    }


    // 리젼 정의
    @Bean
    public RegionProvider regionProvider() {
        return new StaticRegionProvider(Regions.AP_NORTHEAST_2.getName());
    }
}

   4. SqsListener 정의

@Slf4j
@Component
@RequiredArgsConstructor
public class SqsListener {

    @SqsListener(value = "${sqs.sqsExample}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    @Override
    public void receiveEvent(MessageDTO message) throws IOException {
       log.info("", message);
    }
    
}

@SqsListener 만 지정해 주면 해당 메서드에 Queue가 읽어 옵니다.

  • Resolver를 쓰지 않고 String으로 설정 시 String 값으로 넘어오는 것을 확인할 수 있습니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class SqsListener {

    @SqsListener(value = "${sqs.sqsExample}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    @Override
    public void receiveEvent(String message) throws IOException {
       log.info("", message);
    }
    
}
  • deletionPolicy는 SQS Listener에 4가지 삭제 정책(SqsMessageDeletionPolicy)을 설정합니다.
  • ALWAYS : 리스너 메서드에 의한 메시지 처리 중 성공( 예외 발생 없음) 또는 실패 (예외 발생) 시 항상 메시지 삭제
  • NEVER : 메시지를 자동으로 삭제하지 않습니다. 수신 확인 ( Acknowledgment )로 명시적으로 삭제가 가능
  • NO_REDRIVE : Redrive pollcy(DeadLetterQueue)가 정의되지 않은 경우 메시지를 삭제
  • ON_SUCCESS : 리스터 메서드에 의해 성공적으로 실행되면 메시지를 삭제합니다. 

  5. Resolve 설정

import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.Message;

@Slf4j
@AllArgsConstructor
public class MessageResolver implements HandlerMethodArgumentResolver {
    private final MessageConverter messageConverter;

    @Override
    public boolean supportsParameter(MethodParameter parameter) {
        return isAssignable(MessageDTO.class, parameter.getParameterType());
    }

    @Override
    public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
        Object convertMessage = messageConverter.fromMessage(message, MessageDTO.class);
        return ((NotificationRequestConverter.NotificationRequest) convertMessage).getMessage();
      
    }
}

위에 설정을 하면 보통 연결이 정상적으로 되는 것을 확인 가능하세요.

 

작업 중 오류 발생 상황

1. SQS 읽지 못했다는 오류 확인

WARN o.s.c.a.m.l.SimpleMessageListenerContainer@queueAttributes(329) - Ignoring queue with name '[SQS명]': The queue does not exist.; nested exception is com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: AmazonSQS; Status Code: 400; Error Code: AWS.SimpleQueueService.NonExistentQueue; Request ID: 86246416-3974-5cd5-93ff-389aa43357b9; Proxy: null)

해당 오류가 발생하는 원인

  • SQS 권한을 막아두었을 때 발생
  • SQS 주소가 이상이 있는지 확인

2. application.yml에 IAM 키가 Open 되는 이슈

보안상 yml에 보안키가 들어있게 되면 해킹에 위험이 있어서 해당 문제를 해결이 필요

1. AwsCredentialConfiguration 추가

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.utils.StringUtils;

@Configuration
public class AwsCredentialConfiguration {

    @Bean
    public AwsCredentialsProvider awsCredentialsProviderOnLocal(Environment environment) {
        // 해당 방식으로 값을 불러옵니다.
        String accessKeyId = getEnviromentValue(environment, "AWS_ACCESS_KEY_ID", "AWS_ACCESS_KEY_ID");
        String secretAccessKey = getEnviromentValue(environment, "AWS_SECRET_ACCESS_KEY", "AWS_SECRET_ACCESS_KEY");
        // 해당 부분을 다듬어서 보내주기만 하면됩니다.
        return AwsCredentialsProviderChain.builder().credentialsProviders(() -> AwsBasicCredentials.create(accessKeyId, secretAccessKey)).build();
    }

}

2. SQSConfig에 AmazonSQSAsync 추가

@EnableSqs
@Configuration
@EnableConfigurationProperties(value = {AwsSqsNameProperties.class})
@RequiredArgsConstructor
public class SQSConfig {
   ...
   
 // AmazonSQSAsync 에 정보 추가
 @Primary
 @Bean
 public AmazonSQSAsync amazonSQSAws(AwsCredentialsProvider awsCredentialsProvider) {
  BasicAWSCredentials awsCreds = new BasicAWSCredentials(awsCredentialsProvider.resolveCredentials().accessKeyId(), awsCredentialsProvider.resolveCredentials().secretAccessKey());
  return AmazonSQSAsyncClientBuilder.standard()
            .withRegion(String.valueOf(Region.AP_NORTHEAST_2))
            .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
            .build();
 }
   
   ...
}

  3. application.yml 제거

// 프로젝트 배포시 기본으로 CloudFormation 구성을 시작하기 때문에 설정한
// CloudFormation이 없으면 프로젝트 실행이 되지 않음. 해당 기능을 사용하지 않도록 false로 설정.
cloud:
 aws:
  stack:
   auto: false

해당 부분은 위에 이슈로 인해 두고 나머지 설정 부분은 SQSConfig에 추가합니다.

  4. 실행

jar 실행시 

AWS_ACCESS_KEY_ID=[Access key]; AWS_SECRET_ACCESS_KEY=[Secret key]

3. SQS를 읽어는 지나 실제 Listener까지 오지 않는 현상

  • 문제상황

SimpleMessageListenerContainer.java

첫 번째 블락까지 정상적으로 SQS가 전부 읽어집니다. 그러나 두 번째 블락에서 멈추는 현상 발생 그 후 일정 시간이 지나면 SQS가 닫히는 현상

  • 원인

AsynThreadPool을 이용하는 해당 프로젝트에서 따로 설정을 안 해두니 ThreadPool이 0으로 잡히면서 await() 부분에서 스레드가 풀려서 진행될 때까지 무한정 기다리는 현상

  • 해결 방법

AsynThreadPool 추가

@EnableSqs
@Configuration
@EnableConfigurationProperties(value = {AwsSqsNameProperties.class})
@RequiredArgsConstructor
public class SQSConfig {

  ...
  
 // ThreadPool 추가  
 @Bean(name = "sqsThreadPoolTaskScheduler")
 public Executor sqsThreadPoolTaskScheduler() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(3);
    taskExecutor.setMaxPoolSize(10);
    taskExecutor.setQueueCapacity(10);
    taskExecutor.setThreadNamePrefix("sqsThreadPool-");
    taskExecutor.initialize();
    return taskExecutor;
 }
 
 @Bean
 public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
        AmazonSQSAsync amazonSQSAsync,
        @Qualifier("sqsThreadPoolTaskScheduler") Executor taskExecutor) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setMaxNumberOfMessages(10);
        factory.setAutoStartup(true);
        factory.setBackOffTime(1000L);
        // 이 부분에 해당 taskExecutor 등록
        factory.setTaskExecutor((AsyncTaskExecutor) taskExecutor);
        return factory;
 }
  
  ...
  
}

'Java > Spring' 카테고리의 다른 글

[Spring] Rest API 통신 방법 ( RestTemplate vs FeignClient vs WebClient )  (0) 2023.05.05
[Spring] AOP  (0) 2023.03.23
[Spring] MapStruct 적용 방법  (0) 2023.03.21
[Java] Spring Validation Library  (1) 2023.03.12
[Java] Faker Library  (2) 2023.03.11

+ Recent posts