[ 
https://issues.apache.org/jira/browse/KAFKA-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16023155#comment-16023155
 ] 

Eno Thereska commented on KAFKA-5315:
-------------------------------------

[~mjsax] isn't this a fundamental problem that exactly-once and transactions 
are supposed to solve? Falls into the general category of "produce to a bunch 
of topics + state store" atomically. What am I missing?

> Streams exception w/ partially processed record corrupts state store
> --------------------------------------------------------------------
>
>                 Key: KAFKA-5315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5315
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: Mathieu Fenniak
>            Assignee: Matthias J. Sax
>
> When processing a topic record in a Kafka Streams KTable, the record is 
> inserted into the state store before the being forwarded to downstream 
> processors, and may persist in the state store even if downstream processing 
> fails due to an exception.  The persisted state store record may later affect 
> any attempt to restart processing after the exception.
> Specific example series of events in a simple topology: a single KTable 
> source, group by a field in the value, aggregate that adds up another field, 
> output to a topic --
> 1. A new record (record A) is received by the source KTable, and put in the 
> KTable RocksDB state store.
> 2. While processing record A, an exception happens preventing producing to 
> Kafka. (eg, a TimeoutException Failed to
> update metadata after 60000 ms).
> 3. The stream thread throws an unhandled exception and stops.
> 4. The state stores are closed and flushed.  Record A is now in the local 
> state store.
> 5. The consumer group rebalances.
> 6. A different thread, in the same process, on the same host, picks up the 
> task.
> 7. New thread initializes its state store for the KTable, but it's on the 
> same host as the original thread, so it still contains the k/v for record A.
> 8. New thread resumes consuming at the last committed offset, which is before 
> record A.
> 9. When processing record A, the new thread reads the value that was written 
> to the state store in step #1 by record A's key.
> 10. The repartition map receives a Change with both an oldValue and a
> newValue, and forwards a Change(null, v) and Change(v, null)
> 11. The aggregation ends up both subtracting and adding the value of record 
> A, resulting in an incorrect & persistent output.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to