일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- RequestBody
- enumSet
- backend
- 소프티어
- mapstruct
- Test Doulbe
- Spring
- Service 계층 테스트
- JPQL
- db
- ExceptionResolver
- FCM
- Test
- modelmapper
- Coputer Science
- 공룡책
- 인프콘2023
- OS
- Junit 5
- Server
- MySQL
- Java
- proxyFactory
- Test code
- 테크쇼
- ObjectMapper
- 일상
- JPA
- 자바
- softeer
- Today
- Total
공부내용공유
kafka 검증, 예외 처리하기 (feat: @Valid, CommonErrorHandler) 본문
서론
최근 사내에서 카프카 컨슈머 어플리케이션을 개발하게되었다.
최근 들어서 어플리케이션을 개발할 때 예외처리와 로깅을 통한 디버깅의 용이성을 증대시키는 것의 중요성을 많이 느끼고 있어서 나름 열심히 알아보고 적용하였는데 해당 내용을 간단하게 정리하기 위해 글을 작성하였다. (Error Hanlder 에 관한 구체적인 내용은 검색해도 많이 나오지 않는 것 같아 해당 부분에 대해 좀 더 자세히 작성할 예정이다.)
본론
이번 글의 목차는
- kafka event validation
- spring kafka의 여러가지 error handler
로 구성될 예정이다.
kafka Event Validation
사실 해당 내용은 구글 검색하면 잘 작성된 spring docs로 바로 확인이 가능하다, 그래도 내 글을 한번 봐주고 docs가서 확인했으면 좋겠다. (뒤에서 error handler 몇 개도 간략히 소개한다.)
Spring MVC에서 제공해주는 validator(spring-validator-starter) 는 다들 많이 사용해봤을거라고 생각한다.
class Dto {
@NotNull
private int id;
@NotBlank
private String name;
@Number
private int age;
}
public ResponseEntity updateInfo(@Valid @RequestBody Dto dto) {
...
}
이런식으로 많은 어노테이션을 제공해줘서 간편하게 검증이 가능하고 좀 더 상세하게 검증을 하고 싶다면 커스텀 어노테이션을 만들어서 사용할 수 있다. 카프카 이벤트 또한 spring-validation-starter를 간편하게 사용할 수 있다.
- 자신이 사용하는 Kafka Config에 KafkaListenerConfigurer 인터페이스를 구현하게 한다.
- configureKafkaListeners 라는 메서드를 구현하게 되는데 해당 메서드에서 원하는 값을 설정할 수 있다.
- spring-validator-starter에서 제공해주는 LocalValidatorFactoryBean 를 주입받는다.
- 메서드에서 주입받은 validator를 설정해준다. (아래 코드에서 자세히 구현할 예정)
- 그다음은 spring mvc 처럼 Dto 파라미터에 @Valid , Dto 필드에 원하는 검증 어노테이션을 사용한다.
코드로 예시를 보여주면
@Configuration
@RequiredArgsConstructor
public class KafkaConfig implements KafkaListenerConfigurer {
private final LocalValidatorFactoryBean validator;
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
class Dto {
@NotNull
private int id;
@NotBlank
private String name;
@Number
private int age;
}
@KafkaListener(...)
public void validationListner(@Valid @Payload Dto dto) {
...
}
이런식이다!
이렇게 설정을 하고 실제로 형식에 맞지 않는 이벤트를 produce하면 Method Methodargumentnotvalidexception
이 잘 발생한다!
자 그런데 이렇게 예외만 뜨면 오케이냐? 당연히 아니다. 예외에서 어떤 필드가 잘못됐는지 알려주긴 하나 우리는 더 원활한 운영, 디버깅을 위해 적절한 정보를 로깅하는 식으로 예외를 핸들링 해야한다.
Spring Kafka의 여러가지 Error Handler
자 일단 spring kafka에서 제공해주는 Error Handler들은 CommonErrorHandler
인터페이스를 구현하고 있다. spring kafka 3.7.1을 기준으로는 7개의 구현체를 제공해주고 있는데 우리는 여기서 DefaultErrorHandler (이 친구가 제일 대중적으로 사용 될 것 같다.) 를 집중적으로 살펴보고 주요 메서드를 알아보고 나머지 Error Handler들을 간단히 살펴보자.
DefaultErrorHandler
자 일단 사용한 예시부터 보자. spring docs의 예시 코드를 사용했다.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
//DefaultErrorHandler 적용 부분
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
log.error("record : " + consumerRecord + "exception : " + e.getCause())
}, new FixedBackOff(0L, 2L));
factory.setCommonErrorHandler(errorHandler);
return factory;
}
spring docs에서도 이런 방식을 사용할 것을 추천하고 있다. 저렇게 로깅하고 싶은 내용을 작성하면 예외가 handler에 들어올 때마다logging이 된다.
실제로 운영을 하다보면 추가적인 정보가 필요할 수도 있겠으나 저렇게만 해도 record 내용이나 offset과 같은 Kafka meta 정보가 모두 보여지고 예외 내용도 잘 보여져서 나는 저 방식으로 사용했다.
그리고 뒤에 인자인 new FixedBackOff는
- 첫 번째 인자는 재시도를 할 때 몇밀리세컨드의 텀을 둘건지
- 두 번째 인자는 재시도를 총 몇 번 할건지
를 결정한다. 내 기억으론 default 값은 0,10으로 알고있다.
참고로 DefaultErrorHandler의 경우 재시도를 해봤자 의미가 없는 예외의 경우 설정이 어떻게 되어있든 바로 끝내버리는 경우가 있다.
- DeserializationException
- MessageConversionException
- ConversionException
- MethodArgumentResolutionException
- NoSuchMethodException
- ClassCastException
DefaultErrorHandler.addNotRetryableException() 해당 방식으로 재시도를 무시할 예외 추가도 가능하다.
CommonErrorHandler 메서드
CommonErrorHandler의 주요 메서드들은
- handleOne -> 이벤트를 하나씩 받을 때 이벤트에 대한 예외처리를 담당한다.
- handleBatch -> 이벤트를 배치로 받을 때 이벤트에 대한 예외처리를 담당한다.
- seekAfterHandler -> 예외가 발생한 이벤트에 대해 예외 처리를하고 true일 경우 재시도 false일 경우 넘어간다.
- handleOtherException -> consumer 외부에서 발생한 예외에 대한 처리를 맡는다.
가 있다, 다른 메서드들도 있는데 일단 주로 봐야 할 것들만 정리했다.
다른 여러 CommonErrorHandler 구현체들
각각의 구현체들을 특성에 맞게 CommonErrorHandler의 메서드들을 구현한다. 대표적으로 어떤 것들이 있는지 보자.
- CommonContainerStoppingErrorHandler
- 이 친구는 예외가 생기면 빠꾸 없이 컨슈머 컨테이너를 꺼버린다.
- 직접 해보니까 어플리케이션을 끄는건 아닌데 컨슈머가 종료되고 더 이상 이벤트를 받지 않는다.
- 이 친구는 웬만하면 사용되지 않을 것 같은데 워낙 흥미로워서 제일 먼저 설명했다.
- CommonMixedErrorHandler
- Batch에 대한 CommonErrorHandler 따로, Single Record에 대한 CommonErrorHnalder따로
- 이름이 굉장히 직관적이다, 각 컨슈밍 방식에 따라 예외 처리를 다르게 하고 싶다면 사용하면 된다.
- CommonDelegatingErrorHandler
- 얘는 예외에 따라 CommonErrorHnadler를 적용할 수 있는 더 섬세한 친구다.
- 얘도 많이 사용하지 않을까? 싶다, 재시도할 가치가 있는 예외와 할 필요없는 예외, 로깅만 해도 되는 예외와 그렇지 않은 예외...
- 지금은 감이 잘 안오지만 컨슈머 어플레케이션을 운영하다보면 이러한 기준이 생기고 구분할 필요가 생길거라 생각한다.
- FallbackBatchErrorHandler
- batch로 컨슈밍을 할 때 실패를 하고 다시 batch로 이벤트들을 받아올 경우 배치 안의 record수와 순서가
보장이 안된다고 한다. - 만약 이게 보장이 되야 한다면 해당 handler를 사용하면 된다. 해당 handler는 in-memory에서 이전 배치를 불러와 이벤트 처리를 한다.
- batch로 컨슈밍을 할 때 실패를 하고 다시 batch로 이벤트들을 받아올 경우 배치 안의 record수와 순서가
이거 말고 다른 것들도 있는데 다른건 직접 알아보자 (귀찮은거 아니고 직접 봐야 더 기억에 잘남는다. 내가 이렇게 나의 소중한 독자들을 위한다.)
결론
개발을 하면서 그리고 글을 작성하면서 이런 컨슈머 어플리케이션의 경우에는 일반적인 api 어플리케이션보다 예외 처리 (재시도, 다른 로직 적용 등등...)나 로깅을 좀 더 신경써야 할 것 같다는 생각이 들었다.
일단 지금은 글에서처럼 로깅을 하고 있지만 (로깅에 담기는 정보가 디버깅 하기에 충분하다는 생각이 들어서) 실제로 운영을 하다보면 더 많은 정보가 필요할 수도 있고 별거 아닌데 과한 로깅을 하고 있게될 수도 있다.
이러한 경험을 토대로 조금씩 내 기준을 만들어서 보다 더 효울적인 예외 처리와 로깅을 할 예정이다!
'Server > kafka' 카테고리의 다른 글
Kafak Producer 설정 값 모음 (0) | 2024.12.01 |
---|