[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-09-05 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17600551#comment-17600551
 ] 

Philip Nee commented on KAFKA-14024:


Hey [~aiquestion] and [~showuon]  - Seem like the changes here is causing 
duplicated consumption (KAFKA-14196), which is a regression, if I'm 
understanding the logics correctly.  In short, I think the poll loop now will 
continue to fetch data and continue to wait for the async commit to complete.  
So the issue here is, the fetcher will progress beyond the current commit, and 
once the async commit is completed, the partition will get revoked and the 
previous progress is lost.  I think we need to get the timing right to 
reproduce this issue, i.e. I think it usually happens when the async commit 
doesn't complete during the initial poll loop.

 

I could be wrong, but that's my finding so far.

 

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state 

[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568783#comment-17568783
 ] 

Luke Chen commented on KAFKA-14024:
---

> In this way we would not need a separate timer inside the `onJoinPrepare` for 
> the commit itself. 

[~guozhang] , thanks for the suggestion. Yes, that looks simpler! I like it. 
But since release time approaching and there will be new rebalance protocol 
(KIP-848) coming soon, I'm going to merge it as is. But again, thanks for the 
comment. I learned something from it. Thanks.

 

[~aiquestion] , thanks again for finding the issue and the PR!

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a 

[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568732#comment-17568732
 ] 

Guozhang Wang commented on KAFKA-14024:
---

Thanks to [~aiquestion] for filing this and also submitting the PR, I've added 
you as a contributor and assigned the ticket to you too.

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't joined: 
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 

[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568731#comment-17568731
 ] 

Guozhang Wang commented on KAFKA-14024:
---

Hello [~mumrah], I took a look at the ticket and also the PR 
(https://github.com/apache/kafka/pull/12349/files) as well, and I agree with 
[~showuon] that this is a pretty bad regression that we should consider fixing 
asap and hence worthy as a blocker for 3.2.1.

As for the PR, personally I'd simplify it a bit than the current fix, to 
`onJoinPrepare` more re-entrant and idempotent: more specifically when the 
caller thread of `poll` enters `onJoinPrepare`, it will check if there's 
already a commit in-flight already and is completed, and if not send out the 
request and return from `onJoinPrepare` immediately, and hence return from the 
`poll` call as well; and the next `poll` call would re-enter `onJoinPrepare` 
and check if the commit request has completed; only if the maintained commit 
future has been completed then would it continue within the function to revoke 
partitions, trigger callbacks etc. In this way we would not need a separate 
timer inside the `onJoinPrepare` for the commit itself. But since [~showuon] is 
almost done reviewing it I think I would leave it to him, rather not block on 
merging it.

In the new rebalance protocol (KIP-848) we would have a much simpler model on 
the client side so hopefully we would not fall in this awkward design pattern 
any more.

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> 

[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17567453#comment-17567453
 ] 

Luke Chen commented on KAFKA-14024:
---

[~mumrah] , thanks for asking. I think this is a blocker for v3.2.1 since when 
consumer in v3.2.0 with cooperative assignor, consumer rebalance be hanging. PR 
has already 3 rounds of review. It should be completed soon. 

cc [~aiquestion] 

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't 

[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-15 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17567274#comment-17567274
 ] 

David Arthur commented on KAFKA-14024:
--

[~showuon] / [~guozhang] is this a blocker for a bug fix release on 3.2.x? I'm 
currently working on the 3.2.1 plan and this issue is the only open blocker. 
How close are we to having a fix? 

Could we wait on this issue until a future release (3.2.2)?

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members