[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aleksandr Sorokoumov reassigned KAFKA-6502: ------------------------------------------- Assignee: Aleksandr Sorokoumov (was: Jagadesh Adireddi) > Kafka streams deserialization handler not committing offsets on error records > ----------------------------------------------------------------------------- > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Soby Chacko > Assignee: Aleksandr Sorokoumov > Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian Jira (v8.20.1#820001)