[
https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nico Habermann updated KAFKA-12508:
-----------------------------------
Description:
[KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
added emit-on-change semantics to KTables that suppress updates for duplicate
values.
However, this may cause data loss in at_least_once topologies when records are
retried from the last commit due to an error / restart / etc.
Consider the following example:
{code:java}
streams.table(source, materialized)
.toStream()
.map(mayThrow())
.to(output){code}
# Record A gets read
# Record A is stored in the table
# The update for record A is forwarded through the topology
# Map() throws (or alternatively, any restart while the forwarded update was
still being processed and not yet produced to the output topic)
# The stream is restarted and "retries" from the last commit
# Record A gets read again
# The table will discard the update for record A because
## The value is the same
## The timestamp is the same
# Eventually the stream will commit
# There is absolutely no output for Record A even though we're running in
at_least_once
This behaviour does not seem intentional. [The emit-on-change logic explicitly
forwards records that have the same value and an older
timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
This logic should probably be changed to also forward updates that have an
older *or equal* timestamp.
was:
[KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
added emit-on-change semantics to KTables that suppress updates for duplicate
values.
However, this may cause data loss in at_least_once topologies when records are
retried from the last commit due to an error / restart / etc.
Consider the following example:
{code:java}
streams.table(source, materialized)
.toStream()
.map(mayThrow())
.to(output){code}
# Record A gets read
# Record A is stored in the table
# The update for record A is forwarded through the topology
# Map() throws (or alternatively, any restart while the forwarded update was
still being processed and not yet produced to the output topic)
# The stream is restarted and "retries" from the last commit
# Record A gets read again
# The table will discard the update for record A because
## The value is the same
## The timestamp is the same
# Eventually the stream will commit
# There is absolutely no output for Record A even though we're running in
at_least_once
This behaviour does not seem intentional. [The emit-on-change logic explicitly
forwards records that have the same value and an older
timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
This logic should probably be changed to also forward updates that have an
older *or equal* timestamp.
> Emit-on-change tables may lose updates on error or restart in at_least_once
> ---------------------------------------------------------------------------
>
> Key: KAFKA-12508
> URL: https://issues.apache.org/jira/browse/KAFKA-12508
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.7.0, 2.6.1
> Reporter: Nico Habermann
> Priority: Major
>
> [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
> added emit-on-change semantics to KTables that suppress updates for
> duplicate values.
> However, this may cause data loss in at_least_once topologies when records
> are retried from the last commit due to an error / restart / etc.
>
> Consider the following example:
> {code:java}
> streams.table(source, materialized)
> .toStream()
> .map(mayThrow())
> .to(output){code}
>
> # Record A gets read
> # Record A is stored in the table
> # The update for record A is forwarded through the topology
> # Map() throws (or alternatively, any restart while the forwarded update was
> still being processed and not yet produced to the output topic)
> # The stream is restarted and "retries" from the last commit
> # Record A gets read again
> # The table will discard the update for record A because
> ## The value is the same
> ## The timestamp is the same
> # Eventually the stream will commit
> # There is absolutely no output for Record A even though we're running in
> at_least_once
>
> This behaviour does not seem intentional. [The emit-on-change logic
> explicitly forwards records that have the same value and an older
> timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
> This logic should probably be changed to also forward updates that have an
> older *or equal* timestamp.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)