SQS Listener 구축
- 의존성 추가
implementation 'org.springframework.cloud:spring-cloud-starter-aws-messaging'
2. SQS 주소 Properties 생성
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Getter;
@Component
@Getter
@ConfigurationProperties("sqs")
public class AwsSqsNameProperties {
private String sqsExample;
}
- application.yml
sqs:
sqsExample: [SQS 주소]
cloud:
aws:
region:
static: 리전 정보(ex: ap-northeast-2)
stack:
auto: false
credentials:
access-key: IAM 에서 발급받은 엑세스키
secret-key: IAM에서 발급받은 시크릿키
SQS 주소는 빨간 부분을 넣어주시면 됩니다.
3. SQSConfig 작성
@EnableSqs
@Configuration
@EnableConfigurationProperties(value = {AwsSqsNameProperties.class})
@RequiredArgsConstructor
public class SQSConfig {
public static final ObjectMapper objectMapper = Jackson2ObjectMapperBuilder.json()
.simpleDateFormat("yyyy-MM-dd HH:mm:ss")
.timeZone("Asia/Seoul")
.build();
public static final MappingJackson2MessageConverter JSON_MESSAGE_CONVERTER;
static {
JSON_MESSAGE_CONVERTER = new MappingJackson2MessageConverter();
JSON_MESSAGE_CONVERTER.setSerializedPayloadClass(String.class);
JSON_MESSAGE_CONVERTER.setStrictContentTypeMatch(false);
JSON_MESSAGE_CONVERTER.setObjectMapper(objectMapper);
}
// Resolver 연결 특정 객체로 변경할려고 할때 작성
@Bean
public QueueMessageHandlerFactory queueMessageHandlerFactory() {
QueueMessageHandlerFactory factory = new QueueMessageHandlerFactory();
factory.setArgumentResolvers(unmodifiableList(asList(
new MessageResolver(
new NotificationRequestConverter(JSON_MESSAGE_CONVERTER))
)));
return factory;
}
// SQS Listener 빈등록
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
AmazonSQSAsync amazonSQSAsync, AsyncTaskExecutor taskExecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQSAsync);
factory.setMaxNumberOfMessages(10);
factory.setAutoStartup(true);
factory.setBackOffTime(1000L);
factory.setTaskExecutor(taskExecutor);
return factory;
}
// 리젼 정의
@Bean
public RegionProvider regionProvider() {
return new StaticRegionProvider(Regions.AP_NORTHEAST_2.getName());
}
}
4. SqsListener 정의
@Slf4j
@Component
@RequiredArgsConstructor
public class SqsListener {
@SqsListener(value = "${sqs.sqsExample}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
@Override
public void receiveEvent(MessageDTO message) throws IOException {
log.info("", message);
}
}
@SqsListener 만 지정해 주면 해당 메서드에 Queue가 읽어 옵니다.
- Resolver를 쓰지 않고 String으로 설정 시 String 값으로 넘어오는 것을 확인할 수 있습니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class SqsListener {
@SqsListener(value = "${sqs.sqsExample}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
@Override
public void receiveEvent(String message) throws IOException {
log.info("", message);
}
}
- deletionPolicy는 SQS Listener에 4가지 삭제 정책(SqsMessageDeletionPolicy)을 설정합니다.
- ALWAYS : 리스너 메서드에 의한 메시지 처리 중 성공( 예외 발생 없음) 또는 실패 (예외 발생) 시 항상 메시지 삭제
- NEVER : 메시지를 자동으로 삭제하지 않습니다. 수신 확인 ( Acknowledgment )로 명시적으로 삭제가 가능
- NO_REDRIVE : Redrive pollcy(DeadLetterQueue)가 정의되지 않은 경우 메시지를 삭제
- ON_SUCCESS : 리스터 메서드에 의해 성공적으로 실행되면 메시지를 삭제합니다.
5. Resolve 설정
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.Message;
@Slf4j
@AllArgsConstructor
public class MessageResolver implements HandlerMethodArgumentResolver {
private final MessageConverter messageConverter;
@Override
public boolean supportsParameter(MethodParameter parameter) {
return isAssignable(MessageDTO.class, parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception {
Object convertMessage = messageConverter.fromMessage(message, MessageDTO.class);
return ((NotificationRequestConverter.NotificationRequest) convertMessage).getMessage();
}
}
위에 설정을 하면 보통 연결이 정상적으로 되는 것을 확인 가능하세요.
작업 중 오류 발생 상황
1. SQS 읽지 못했다는 오류 확인
WARN o.s.c.a.m.l.SimpleMessageListenerContainer@queueAttributes(329) - Ignoring queue with name '[SQS명]': The queue does not exist.; nested exception is com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: AmazonSQS; Status Code: 400; Error Code: AWS.SimpleQueueService.NonExistentQueue; Request ID: 86246416-3974-5cd5-93ff-389aa43357b9; Proxy: null)
해당 오류가 발생하는 원인
- SQS 권한을 막아두었을 때 발생
- SQS 주소가 이상이 있는지 확인
2. application.yml에 IAM 키가 Open 되는 이슈
보안상 yml에 보안키가 들어있게 되면 해킹에 위험이 있어서 해당 문제를 해결이 필요
1. AwsCredentialConfiguration 추가
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.utils.StringUtils;
@Configuration
public class AwsCredentialConfiguration {
@Bean
public AwsCredentialsProvider awsCredentialsProviderOnLocal(Environment environment) {
// 해당 방식으로 값을 불러옵니다.
String accessKeyId = getEnviromentValue(environment, "AWS_ACCESS_KEY_ID", "AWS_ACCESS_KEY_ID");
String secretAccessKey = getEnviromentValue(environment, "AWS_SECRET_ACCESS_KEY", "AWS_SECRET_ACCESS_KEY");
// 해당 부분을 다듬어서 보내주기만 하면됩니다.
return AwsCredentialsProviderChain.builder().credentialsProviders(() -> AwsBasicCredentials.create(accessKeyId, secretAccessKey)).build();
}
}
2. SQSConfig에 AmazonSQSAsync 추가
@EnableSqs
@Configuration
@EnableConfigurationProperties(value = {AwsSqsNameProperties.class})
@RequiredArgsConstructor
public class SQSConfig {
...
// AmazonSQSAsync 에 정보 추가
@Primary
@Bean
public AmazonSQSAsync amazonSQSAws(AwsCredentialsProvider awsCredentialsProvider) {
BasicAWSCredentials awsCreds = new BasicAWSCredentials(awsCredentialsProvider.resolveCredentials().accessKeyId(), awsCredentialsProvider.resolveCredentials().secretAccessKey());
return AmazonSQSAsyncClientBuilder.standard()
.withRegion(String.valueOf(Region.AP_NORTHEAST_2))
.withCredentials(new AWSStaticCredentialsProvider(awsCreds))
.build();
}
...
}
3. application.yml 제거
// 프로젝트 배포시 기본으로 CloudFormation 구성을 시작하기 때문에 설정한
// CloudFormation이 없으면 프로젝트 실행이 되지 않음. 해당 기능을 사용하지 않도록 false로 설정.
cloud:
aws:
stack:
auto: false
해당 부분은 위에 이슈로 인해 두고 나머지 설정 부분은 SQSConfig에 추가합니다.
4. 실행
jar 실행시
AWS_ACCESS_KEY_ID=[Access key]; AWS_SECRET_ACCESS_KEY=[Secret key]
3. SQS를 읽어는 지나 실제 Listener까지 오지 않는 현상
- 문제상황
첫 번째 블락까지 정상적으로 SQS가 전부 읽어집니다. 그러나 두 번째 블락에서 멈추는 현상 발생 그 후 일정 시간이 지나면 SQS가 닫히는 현상
- 원인
AsynThreadPool을 이용하는 해당 프로젝트에서 따로 설정을 안 해두니 ThreadPool이 0으로 잡히면서 await() 부분에서 스레드가 풀려서 진행될 때까지 무한정 기다리는 현상
- 해결 방법
AsynThreadPool 추가
@EnableSqs
@Configuration
@EnableConfigurationProperties(value = {AwsSqsNameProperties.class})
@RequiredArgsConstructor
public class SQSConfig {
...
// ThreadPool 추가
@Bean(name = "sqsThreadPoolTaskScheduler")
public Executor sqsThreadPoolTaskScheduler() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(10);
taskExecutor.setThreadNamePrefix("sqsThreadPool-");
taskExecutor.initialize();
return taskExecutor;
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(
AmazonSQSAsync amazonSQSAsync,
@Qualifier("sqsThreadPoolTaskScheduler") Executor taskExecutor) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSQSAsync);
factory.setMaxNumberOfMessages(10);
factory.setAutoStartup(true);
factory.setBackOffTime(1000L);
// 이 부분에 해당 taskExecutor 등록
factory.setTaskExecutor((AsyncTaskExecutor) taskExecutor);
return factory;
}
...
}
'Java > Spring' 카테고리의 다른 글
[Spring] Rest API 통신 방법 ( RestTemplate vs FeignClient vs WebClient ) (0) | 2023.05.05 |
---|---|
[Spring] AOP (0) | 2023.03.23 |
[Spring] MapStruct 적용 방법 (0) | 2023.03.21 |
[Java] Spring Validation Library (1) | 2023.03.12 |
[Java] Faker Library (2) | 2023.03.11 |