[ 
https://issues.apache.org/jira/browse/KAFKA-7285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7285:
-----------------------------------
    Fix Version/s: 1.1.2

> Streams should be more fencing-sensitive during task suspension under EOS
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-7285
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7285
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>            Reporter: Guozhang Wang
>            Assignee: Matthias J. Sax
>            Priority: Major
>             Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> When EOS is turned on, Streams did the following steps:
> 1. InitTxn in task creation.
> 2. BeginTxn in topology initialization.
> 3. AbortTxn in clean shutdown.
> 4. CommitTxn in commit(), which is called in suspend() as well.
> Now consider this situation, with two thread (Ta) and (Tb) and one task:
> 1. originally Ta owns the task, consumer generation is 1.
> 2. Ta is un-responsive to send heartbeats, and gets kicked out, a new 
> generation 2 is formed with Tb in it. The task is migrated to Tb while Ta 
> does not know.
> 3. Ta finally calls `consumer.poll` and was aware of the rebalance, it 
> re-joins the group, forming a new generation of 3. And during the rebalance 
> the leader decides to assign the task back to Ta.
> 4.a) Ta calls onPartitionRevoked on the task, suspending it and call commit. 
> However if there is no data ever sent since `BeginTxn`, this commit call will 
> become a no-op.
> 4.b) Ta then calls onPartitionAssigned on the task, resuming it, and then 
> calls BeginTxn. Then it was encountered a ProducerFencedException, 
> incorrectly.
> The root cause is that, Ta does not trigger InitTxn to claim "I'm the newest 
> for this txnId, and am going to fence everyone else with the same txnId", so 
> it was mistakenly treated as the old client than Tb.
> Note that this issue is not common, since we need to encounter a txn that did 
> not send any data at all to make its commitTxn call a no-op, and hence not 
> being fenced earlier on.
> One proposal for this issue is to close the producer and recreates a new one 
> in `suspend` after the commitTxn call succeeded and `startNewTxn` is false, 
> so that the new producer will always `initTxn` to fence others.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to