[
https://issues.apache.org/jira/browse/KAFKA-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023155#comment-16023155
]
Eno Thereska commented on KAFKA-5315:
-------------------------------------
[~mjsax] isn't this a fundamental problem that exactly-once and transactions
are supposed to solve? Falls into the general category of "produce to a bunch
of topics + state store" atomically. What am I missing?
> Streams exception w/ partially processed record corrupts state store
> --------------------------------------------------------------------
>
> Key: KAFKA-5315
> URL: https://issues.apache.org/jira/browse/KAFKA-5315
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.1
> Reporter: Mathieu Fenniak
> Assignee: Matthias J. Sax
>
> When processing a topic record in a Kafka Streams KTable, the record is
> inserted into the state store before the being forwarded to downstream
> processors, and may persist in the state store even if downstream processing
> fails due to an exception. The persisted state store record may later affect
> any attempt to restart processing after the exception.
> Specific example series of events in a simple topology: a single KTable
> source, group by a field in the value, aggregate that adds up another field,
> output to a topic --
> 1. A new record (record A) is received by the source KTable, and put in the
> KTable RocksDB state store.
> 2. While processing record A, an exception happens preventing producing to
> Kafka. (eg, a TimeoutException Failed to
> update metadata after 60000 ms).
> 3. The stream thread throws an unhandled exception and stops.
> 4. The state stores are closed and flushed. Record A is now in the local
> state store.
> 5. The consumer group rebalances.
> 6. A different thread, in the same process, on the same host, picks up the
> task.
> 7. New thread initializes its state store for the KTable, but it's on the
> same host as the original thread, so it still contains the k/v for record A.
> 8. New thread resumes consuming at the last committed offset, which is before
> record A.
> 9. When processing record A, the new thread reads the value that was written
> to the state store in step #1 by record A's key.
> 10. The repartition map receives a Change with both an oldValue and a
> newValue, and forwards a Change(null, v) and Change(v, null)
> 11. The aggregation ends up both subtracting and adding the value of record
> A, resulting in an incorrect & persistent output.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)