source

스프링카프카를 사용하는 카프카에 대한 데드 레터 큐(DLQ)

lovecheck 2023. 6. 26. 21:26
반응형

스프링카프카를 사용하는 카프카에 대한 데드 레터 큐(DLQ)

Spring-kafka 2.1.x를 사용하여 Spring-kafka 2.0 응용 프로그램에서 Dead Letter Queue(DLQ) 개념을 구현하여 일부 콩의 @KafkaListener 메서드에 의해 처리되지 못한 모든 메시지를 미리 정의된 Kafka DLQ 항목으로 보내고 단일 메시지를 잃지 않도록 하는 가장 좋은 방법은 무엇입니까?

따라서 소비된 카프카 레코드는 다음 중 하나입니다.

  1. 성공적으로 처리되었습니다.
  2. 처리에 실패하여 DLQ 항목으로 전송됩니다.
  3. 처리하지 못했습니다. 예기치 않은 문제로 인해 DLQ 항목으로 전송되지 않으므로 수신기에서 다시 사용됩니다.

KafkaTemplate를 사용하여 DLQ 항목으로 레코드를 전송하는 ErrorHandler의 사용자 정의 구현으로 수신기 컨테이너를 생성하려고 했습니다.사용하지 않도록 설정된 자동 커밋 및 RECORD Ack 모드 사용.

spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        log.error("Error, sending to DLQ...");
        kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
}

이 구현은 항목 3번을 보장하지 않는 것 같습니다.DlqErrorHandler에서 예외가 발생할 경우 수신기에서 다시 사용되지 않습니다.

트랜잭션 수신기 컨테이너를 사용하는 것이 도움이 됩니까?

factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

스프링 카프카를 사용하여 DLQ 개념을 구현하는 편리한 방법이 있습니까?

2018/03/28 업데이트

Gary Russell의 답변 덕분에 다음과 같이 DlqErrorHandler를 구현하여 원하는 동작을 달성할 수 있었습니다.

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Consumerrecord<?, ? record = records.get(0);
        try {
            kafkaTemplate.send("dlqTopic", record.key, record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
        } catch (Exception e) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
            throw new KafkaException("Seek to current after exception", thrownException);
        }
    }
}

소비자 의견조사에서 세 개의 레코드(1, 2, 3)를 반환하고 두 번째 레코드를 처리할 수 없는 경우 다음과 같이 처리합니다.

  • 1개가 처리됩니다.
  • 2는 처리되지 않고 DLQ로 전송됩니다.
  • 3 소비자가 녹음을 추구하는 덕분에. ()offset + 1, 청취자에게 전달될 것입니다.

DLQ로 전송하는 데 실패하면 소비자는 record.offset()을 검색하고 레코드는 수신기에 다시 전달됩니다(그리고 DLQ로 전송하는 것은 아마도 폐기될 것입니다).

2021/04/30 업데이트

Spring Kafka 2.7.0 이후 비차단 재시도 데드 레터 주제가 기본적으로 지원됩니다.

예를 참조하십시오. https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt

재시도는 일반적으로 차단되지 않고(별도의 항목에서 수행됨) 지연되어야 합니다.

  • 실시간 트래픽을 방해하지 않는 것.
  • 통화 수를 증폭시키지 않고, 기본적으로 스팸으로 불량 요청을 처리합니다.
  • (재시도 및 기타 메타데이터의 수를 얻기 위해) 관찰할 수 있습니다.Kafka를 사용하여 비차단 재시도 및 DLT 기능을 수행하려면 일반적으로 추가 항목을 설정하고 해당 수신기를 만들고 구성해야 합니다.Kafka non-blocking retries and DLT

를 참조하십시오.

예외가 발생하면 다음 폴링에서 처리되지 않은 모든 레코드가 다시 전송되도록 소비자를 찾습니다.

동일한 기법(예: 하위 클래스)을 사용하여 DLQ에 기록하고 DLQ 쓰기가 실패한 경우 현재 오프셋(및 처리되지 않은 다른 레코드)을 찾고 DLQ 쓰기가 성공한 경우 나머지 레코드만 검색할 수 있습니다.

편집

DeadLetterPublishingRecoverer이 답변이 게시된 지 몇 달 후에 추가되었습니다.

https://docs.spring.io/spring-kafka/docs/current/reference/html/ #데드레터

언급URL : https://stackoverflow.com/questions/49507709/dead-letter-queue-dlq-for-kafka-with-spring-kafka

반응형