cadonna commented on code in PR #15035: URL: https://github.com/apache/kafka/pull/15035#discussion_r1431466392
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -728,7 +732,6 @@ public boolean shouldSkipHeartbeat() { @Override public void transitionToStaled() { memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; - currentAssignment.clear(); Review Comment: Why do we not need to remove the current assignment if we leave the group? Is this because of `updateSubscription()` on line 677? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); - if (pollTimer.isExpired()) { + // If the poll timer expires during reconciliation, we need to wait till the reconciliation completes before + // sending another leave group. Review Comment: Could you please elaborate why we need to wait? It is not clear to me, because I thought that we can leave the group whenever we want. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -184,7 +184,9 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); - if (pollTimer.isExpired()) { + // If the poll timer expires during reconciliation, we need to wait till the reconciliation completes before + // sending another leave group. + if (pollTimer.isExpired() && membershipManager.state() == MemberState.STABLE) { Review Comment: Is there a unit test that tests the behavior when this condition is not satisfied? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -672,7 +672,11 @@ public void onHeartbeatRequestSent() { MemberState state = state(); if (isStaled()) { log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); - transitionToJoining(); + // clear the current assignment and subscription, and trigger rebalance listener on the next poll + invokeOnPartitionsRevokedCallback(subscriptions.assignedPartitions()).whenComplete((r, e) -> { Review Comment: Is it correct to call `onPartitionRevoked()` here? Technically, the member is not a member of the group anymore. I am not sure it should be allowed to do anything group-related like committing offsets which is one of the main purposes of `onPartitionRevoked()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org