[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17474389#comment-17474389 ] Aleksandr Sorokoumov commented on KAFKA-6502: - [~jadireddi] Are you actively working on this issue? If not, I would like to give it a try. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Assignee: Jagadesh Adireddi >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121269#comment-17121269 ] Guozhang Wang commented on KAFKA-6502: -- I think it is not fixed via KAFKA-6607 since if the deserialization failed, then the record is not put into the buffer at all and hence the consumedOffsets would not be updated. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Assignee: Jagadesh Adireddi >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17121235#comment-17121235 ] Matthias J. Sax commented on KAFKA-6502: [~guozhang] I think this issue is fixed via https://issues.apache.org/jira/browse/KAFKA-6607 ? > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Assignee: Jagadesh Adireddi >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366010#comment-16366010 ] Matthias J. Sax commented on KAFKA-6502: If we really change the timestamp handling, and only look at the first record per partition to determine which record to pick next, this issue resolved itself, as we can to lazy deserialization for the first record per partition only. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365057#comment-16365057 ] Guozhang Wang commented on KAFKA-6502: -- There are some tricky cases worth pointing out for designing the fix: say if you have offset 0,1,2,3,4,5 and deser failed on record of offset 1, 3 and succeeded in others but the successfully deserialized records have not been processed but only be sitting in the partition queue, then when you commit, you have to consider which offset to commit to: after record 0 processed, you can commit 1 (since 1 deser failed and skipped already). after record 2 processed, you can commit 3 (since 3 deser failed and skipped already). after record 4 processed, you can commit 4. after record 5 processed, you can commit 5. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358884#comment-16358884 ] Guozhang Wang commented on KAFKA-6502: -- Got it, thanks! I think this is a valid point, we will keep it on track for known issues and consider a fix based on priorities. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358638#comment-16358638 ] Soby Chacko commented on KAFKA-6502: Hi Guozhang, your reasonings are correct. However, if I introduce a custom DLQ type of exception handler (in which case, if I have a bad Message, it is sent to an error-topic), then if I happen to re-start the application, I don't want the poison pills to be re-sent to the DLQ as they have already been sent. Anyways, this is a corner case I ran into and may not happen frequently. Thanks! > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16351051#comment-16351051 ] Guozhang Wang commented on KAFKA-6502: -- Hello Soby, thanks for reporting this issue. Your reasoning is basically right: a record is firstly deserialized and then put into the buffer, then being processed. If the deser failed it will not be processed at all. And then if there is a series of records having deserialization error, none would be processed, i.e. {{StreamTask.process()}} would not be processed at all. Before we jumped onto possible fixes, could I ask your scenarios: when you see the skipped records metric increasing, why do you want to restart your applications immediately in the middle of such series of error messages? Would you expect there is no valid records ever coming next? My reasoning is that: 1. If these poison pills are due to your application's own serde was buggy, then after restarting with the fixed serde these records should be correctly processed then, so we are good here. 2. If these poison pills are bad themselves and cannot be processed anyways, you would not bother restarting your application; on the other hand you can just let your application continue to run and skip these records. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)