lucasbru commented on code in PR #15415: URL: https://github.com/apache/kafka/pull/15415#discussion_r1499180116
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -215,6 +216,7 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { } NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false); + membershipManager.onHeartbeatRequestSent(); Review Comment: Would it not be more consistent to run this one a leave heartbeat as well, just handle the state correctly internally? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -744,16 +767,34 @@ public boolean isLeavingGroup() { return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING; } + @Override + public void maybeRejoinStaleMember() { + if (state == MemberState.STALE) { + staleMemberAssignmentRelease.whenComplete((__, error) -> transitionToJoining()); Review Comment: Can we avoid enqueuing many of these completionstages while we haven't finished the `staleMemberAssignmentRelease`? although it looks like the operation is idempotent, it's not good to create an arbitrary number of Futures here (with every `PollApplicationEvent`, a new one is created! ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ########## @@ -132,11 +132,20 @@ public enum MemberState { PREPARE_LEAVING.previousValidStates = Arrays.asList(JOINING, STABLE, RECONCILING, ACKNOWLEDGING, UNSUBSCRIBED, FENCED); - LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING); + // Transition from prepare leaving to leaving is the expected one in all close operations + // except for when the poll timer expires (ex. leave group due to unsubscribe or consumer + // close, where member triggers callbacks first while it continues sending heartbeat + // (PREPARE_LEAVE state) and then sends the heartbeat to leave (LEAVING state). + // All other transitions directly to LEAVING are expected when the member leaves due to + // expired poll timer. In that case, the member sends the heartbeat to leave first, and + // then invokes callbacks to release assignment while STALE, not sending any more + // heartbeats while STALE because it has been already removed from the group on the broker. + LEAVING.previousValidStates = Arrays.asList(PREPARE_LEAVING, JOINING, RECONCILING, Review Comment: Couldn't we avoid this long comment and complexity by transitioning through `PREPARE_LEAVING` either way - even if we don't do much? I don't like how a corner case (poll timer expiration) complicates the state machine so much ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -564,7 +587,7 @@ public void transitionToJoining() { */ @Override public CompletableFuture<Void> leaveGroup() { - if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE) { Review Comment: Do we really want to allow leave group when we are fenced already? As far as I can see we'd invoke `onPartitionsLost` when being fenced and then again `onPartitionsRevoked` when leave group. Can we just use a `nonInGroup` check here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org