[ https://issues.apache.org/jira/browse/KAFKA-7285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax updated KAFKA-7285: ----------------------------------- Fix Version/s: 1.1.2 > Streams should be more fencing-sensitive during task suspension under EOS > ------------------------------------------------------------------------- > > Key: KAFKA-7285 > URL: https://issues.apache.org/jira/browse/KAFKA-7285 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0 > Reporter: Guozhang Wang > Assignee: Matthias J. Sax > Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > When EOS is turned on, Streams did the following steps: > 1. InitTxn in task creation. > 2. BeginTxn in topology initialization. > 3. AbortTxn in clean shutdown. > 4. CommitTxn in commit(), which is called in suspend() as well. > Now consider this situation, with two thread (Ta) and (Tb) and one task: > 1. originally Ta owns the task, consumer generation is 1. > 2. Ta is un-responsive to send heartbeats, and gets kicked out, a new > generation 2 is formed with Tb in it. The task is migrated to Tb while Ta > does not know. > 3. Ta finally calls `consumer.poll` and was aware of the rebalance, it > re-joins the group, forming a new generation of 3. And during the rebalance > the leader decides to assign the task back to Ta. > 4.a) Ta calls onPartitionRevoked on the task, suspending it and call commit. > However if there is no data ever sent since `BeginTxn`, this commit call will > become a no-op. > 4.b) Ta then calls onPartitionAssigned on the task, resuming it, and then > calls BeginTxn. Then it was encountered a ProducerFencedException, > incorrectly. > The root cause is that, Ta does not trigger InitTxn to claim "I'm the newest > for this txnId, and am going to fence everyone else with the same txnId", so > it was mistakenly treated as the old client than Tb. > Note that this issue is not common, since we need to encounter a txn that did > not send any data at all to make its commitTxn call a no-op, and hence not > being fenced earlier on. > One proposal for this issue is to close the producer and recreates a new one > in `suspend` after the commitTxn call succeeded and `startNewTxn` is false, > so that the new producer will always `initTxn` to fence others. -- This message was sent by Atlassian JIRA (v7.6.3#76005)