[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14024:
------------------------------
    Description: 
Hi 

In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
that consumer#poll(duration) will be returned after the provided duration. It's 
because if rebalance needed, we'll try to commit current offset first before 
rebalance synchronously. And if the offset committing takes too long, the 
consumer#poll will spend more time than provided duration. To fix that, we 
change commit sync with commit async before rebalance (i.e. onPrepareJoin).

 

However, in this ticket, we found the async commit will keep sending a new 
commit request during each Consumer#poll, because the offset commit never 
completes in time. The impact is that the existing consumer will be kicked out 
of the group after rebalance timeout without joining the group. That is, 
suppose we have consumer A in group G, and now consumer B joined the group, 
after the rebalance, only consumer B in the group.

 

The workaround for this issue is to change the assignor back to eager 
assignors, ex: StickyAssignor, RoundRobinAssignor.

 

To fix the issue, we come out 2 solutions:
 # we can explicitly wait for the async commit complete in onPrepareJoin, but 
that would let the KAFKA-13310 issue happen again.
 # 2.we can try to keep the async commit offset future currently inflight. So 
that we can make sure each Consumer#poll, we are waiting for the future 
completes

 

Besides, there's also another bug found during fixing this bug. Before 
KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry when 
retriable error until timeout. After KAFKA-13310, we thought we have retry, but 
we'll retry after partitions revoking. That is, even though the retried offset 
commit successfully, it still causes some partitions offsets un-committed, and 
after rebalance, other consumers will consume overlapping records.

 

 

===

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]

 

we didn't wait for client to receive commit offset response here, so 
onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
client will loop in invoking onJoinPrepare.

I think the EAGER mode don't have this problem because it will revoke the 
partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to 
commit next round.

reproduce:
 * single node Kafka version 3.2.0 && client version 3.2.0
 * topic1 have 5 partititons
 * start a consumer1 (cooperative rebalance)
 * start another consumer2 (same consumer group)
 * consumer1 will hang for a long time before re-join
 * from server log consumer1 rebalance timeout before joineGroup and re-join 
with another memberId

consume1's log keeps printing:

16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 
and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
(ConsumerCoordinator.java:739)
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
(ConsumerCoordinator.java:1143)

 

and coordinator's log:

[2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 56 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic 
members who haven't joined: 
Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
generation 57 (__consumer_offsets-30) with 3 members 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with 
unknown member id joins group xxx in CompletingRebalance state. Created a new 
member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the 
member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from 
leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for 
generation 57. The group has 3 members, 0 of which are static. 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 57 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)

  was:
Hi 

In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
that consumer#poll(duration) will be returned after the provided duration. It's 
because if rebalance needed, we'll try to commit current offset first before 
rebalance synchronously. And if the offset committing takes too long, the 
consumer#poll will spend more time than provided duration. To fix that, we 
change commit sync with commit async before rebalance (i.e. onPrepareJoin).

 

However, in this ticket, we found the async commit will keep sending a new 
commit request during each Consumer#poll, because the offset commit never 
completes in time. The impact is that the existing consumer will be kicked out 
of the group after rebalance timeout without joining the group. That is, 
suppose we have consumer A in group G, and now consumer B joined the group, 
after the rebalance, only consumer B in the group.

 

The workaround for this issue is to change the assignor back to eager 
assignors, ex: StickyAssignor, RoundRobinAssignor.

 

To fix the issue, we come out 2 solutions:
 # we can explicitly wait for the async commit complete in onPrepareJoin, but 
that would let the KAFKA-13310 issue happen again.
 # 2.we can try to keep the async commit offset future currently inflight. So 
that we can make sure each Consumer#poll, we are waiting for the future 
completes

 

===

[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]

 

we didn't wait for client to receive commit offset response here, so 
onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
client will loop in invoking onJoinPrepare.

I think the EAGER mode don't have this problem because it will revoke the 
partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to 
commit next round.

reproduce:
 * single node Kafka version 3.2.0 && client version 3.2.0
 * topic1 have 5 partititons
 * start a consumer1 (cooperative rebalance)
 * start another consumer2 (same consumer group)
 * consumer1 will hang for a long time before re-join
 * from server log consumer1 rebalance timeout before joineGroup and re-join 
with another memberId

consume1's log keeps printing:

16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 
and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
(ConsumerCoordinator.java:739)
16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
(ConsumerCoordinator.java:1143)

 

and coordinator's log:

[2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 56 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic 
members who haven't joined: 
Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
generation 57 (__consumer_offsets-30) with 3 members 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with 
unknown member id joins group xxx in CompletingRebalance state. Created a new 
member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the 
member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from 
leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for 
generation 57. The group has 3 members, 0 of which are static. 
(kafka.coordinator.group.GroupCoordinator)
[2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance 
group xxx in state PreparingRebalance with old generation 57 
(__consumer_offsets-30) (reason: Adding new member 
consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id 
None; client reason: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)


> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-14024
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14024
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.2.0
>            Reporter: Shawn Wang
>            Priority: Blocker
>              Labels: new-consumer-threading-should-fix
>             Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't joined: 
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
> generation 57 (__consumer_offsets-30) with 3 members 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with 
> unknown member id joins group xxx in CompletingRebalance state. Created a new 
> member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the 
> member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from 
> leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for 
> generation 57. The group has 3 members, 0 of which are static. 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 57 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to