[ 
https://issues.apache.org/jira/browse/KAFKA-7285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16579033#comment-16579033
 ] 

ASF GitHub Bot commented on KAFKA-7285:
---------------------------------------

mjsax opened a new pull request #5501: KAFKA-7285: Create new producer on each 
rebalance if EOS enabled
URL: https://github.com/apache/kafka/pull/5501
 
 
   - close producer on suspend() if EOS enabled
   - create new producer on resume() if EOS enabled
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Streams should be more fencing-sensitive during task suspension under EOS
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-7285
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7285
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>            Reporter: Guozhang Wang
>            Assignee: Matthias J. Sax
>            Priority: Major
>
> When EOS is turned on, Streams did the following steps:
> 1. InitTxn in task creation.
> 2. BeginTxn in topology initialization.
> 3. AbortTxn in clean shutdown.
> 4. CommitTxn in commit(), which is called in suspend() as well.
> Now consider this situation, with two thread (Ta) and (Tb) and one task:
> 1. originally Ta owns the task, consumer generation is 1.
> 2. Ta is un-responsive to send heartbeats, and gets kicked out, a new 
> generation 2 is formed with Tb in it. The task is migrated to Tb while Ta 
> does not know.
> 3. Ta finally calls `consumer.poll` and was aware of the rebalance, it 
> re-joins the group, forming a new generation of 3. And during the rebalance 
> the leader decides to assign the task back to Ta.
> 4.a) Ta calls onPartitionRevoked on the task, suspending it and call commit. 
> However if there is no data ever sent since `BeginTxn`, this commit call will 
> become a no-op.
> 4.b) Ta then calls onPartitionAssigned on the task, resuming it, and then 
> calls BeginTxn. Then it was encountered a ProducerFencedException, 
> incorrectly.
> The root cause is that, Ta does not trigger InitTxn to claim "I'm the newest 
> for this txnId, and am going to fence everyone else with the same txnId", so 
> it was mistakenly treated as the old client than Tb.
> Note that this issue is not common, since we need to encounter a txn that did 
> not send any data at all to make its commitTxn call a no-op, and hence not 
> being fenced earlier on.
> One proposal for this issue is to close the producer and recreates a new one 
> in `suspend` after the commitTxn call succeeded and `startNewTxn` is false, 
> so that the new producer will always `initTxn` to fence others.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to