[ https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna updated KAFKA-12508: ---------------------------------- Fix Version/s: 2.6.2 2.7.1 2.8.0 > Emit-on-change tables may lose updates on error or restart in at_least_once > --------------------------------------------------------------------------- > > Key: KAFKA-12508 > URL: https://issues.apache.org/jira/browse/KAFKA-12508 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.0, 2.6.1 > Reporter: Nico Habermann > Assignee: Bruno Cadonna > Priority: Blocker > Fix For: 2.8.0, 2.7.1, 2.6.2 > > > [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] > added emit-on-change semantics to KTables that suppress updates for > duplicate values. > However, this may cause data loss in at_least_once topologies when records > are retried from the last commit due to an error / restart / etc. > > Consider the following example: > {code:java} > streams.table(source, materialized) > .toStream() > .map(mayThrow()) > .to(output){code} > > # Record A gets read > # Record A is stored in the table > # The update for record A is forwarded through the topology > # Map() throws (or alternatively, any restart while the forwarded update was > still being processed and not yet produced to the output topic) > # The stream is restarted and "retries" from the last commit > # Record A gets read again > # The table will discard the update for record A because > ## The value is the same > ## The timestamp is the same > # Eventually the stream will commit > # There is absolutely no output for Record A even though we're running in > at_least_once > > This behaviour does not seem intentional. [The emit-on-change logic > explicitly forwards records that have the same value and an older > timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50] > This logic should probably be changed to also forward updates that have an > older *or equal* timestamp. -- This message was sent by Atlassian Jira (v8.3.4#803005)