[ https://issues.apache.org/jira/browse/KAFKA-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023155#comment-16023155 ]
Eno Thereska commented on KAFKA-5315: ------------------------------------- [~mjsax] isn't this a fundamental problem that exactly-once and transactions are supposed to solve? Falls into the general category of "produce to a bunch of topics + state store" atomically. What am I missing? > Streams exception w/ partially processed record corrupts state store > -------------------------------------------------------------------- > > Key: KAFKA-5315 > URL: https://issues.apache.org/jira/browse/KAFKA-5315 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.1 > Reporter: Mathieu Fenniak > Assignee: Matthias J. Sax > > When processing a topic record in a Kafka Streams KTable, the record is > inserted into the state store before the being forwarded to downstream > processors, and may persist in the state store even if downstream processing > fails due to an exception. The persisted state store record may later affect > any attempt to restart processing after the exception. > Specific example series of events in a simple topology: a single KTable > source, group by a field in the value, aggregate that adds up another field, > output to a topic -- > 1. A new record (record A) is received by the source KTable, and put in the > KTable RocksDB state store. > 2. While processing record A, an exception happens preventing producing to > Kafka. (eg, a TimeoutException Failed to > update metadata after 60000 ms). > 3. The stream thread throws an unhandled exception and stops. > 4. The state stores are closed and flushed. Record A is now in the local > state store. > 5. The consumer group rebalances. > 6. A different thread, in the same process, on the same host, picks up the > task. > 7. New thread initializes its state store for the KTable, but it's on the > same host as the original thread, so it still contains the k/v for record A. > 8. New thread resumes consuming at the last committed offset, which is before > record A. > 9. When processing record A, the new thread reads the value that was written > to the state store in step #1 by record A's key. > 10. The repartition map receives a Change with both an oldValue and a > newValue, and forwards a Change(null, v) and Change(v, null) > 11. The aggregation ends up both subtracting and adding the value of record > A, resulting in an incorrect & persistent output. -- This message was sent by Atlassian JIRA (v6.3.15#6346)