Shawn Wang created KAFKA-14024: ---------------------------------- Summary: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance Key: KAFKA-14024 URL: https://issues.apache.org/jira/browse/KAFKA-14024 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.2.0 Reporter: Shawn Wang
Hi I think this is introduce in https://issues.apache.org/jira/browse/KAFKA-13310. [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] we didn't wait for client to receive commit offset response here, so onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance. I think the EAGER mode don't have this problem simply because it will revoke the partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try to commit next round. reproduce: * single node Kafka version 3.2.0 && client version 3.2.0 * topic1 have 5 partititons * start a consumer1 (cooperative rebalance) * start another consumer2 (same consumer group) * consumer1 will hang for a long time before re-join * from server log consumer1 rebalance timeout before joineGroup and re-join with another memberId consume1's log keeps printing: 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 (ConsumerCoordinator.java:739) 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} (ConsumerCoordinator.java:1143) and coordinator's log: [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance group xxx in state PreparingRebalance with old generation 56 (__consumer_offsets-30) (reason: Adding new member consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed dynamic members who haven't joined: Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx generation 57 (__consumer_offsets-30) with 3 members (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:44,048] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group xxx in CompletingRebalance state. Created a new member id consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:44,053] INFO [GroupCoordinator 0]: Assignment received from leader consumer-xxx-1-e842a14c-eff7-4b55-9463-72b9c2534afd for group xxx for generation 57. The group has 3 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) [2022-06-26 17:00:44,243] INFO [GroupCoordinator 0]: Preparing to rebalance group xxx in state PreparingRebalance with old generation 57 (__consumer_offsets-30) (reason: Adding new member consumer-xxx-1-f0298aa0-711c-498e-bdfd-1dd205d7b640 with group instance id None; client reason: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) -- This message was sent by Atlassian Jira (v8.20.7#820007)