lucasbru commented on code in PR #16569: URL: https://github.com/apache/kafka/pull/16569#discussion_r1682463147
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -459,8 +464,30 @@ public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData response) { } @Override - public void onHeartbeatFailure() { - metricsManager.maybeRecordRebalanceFailed(); + public void onHeartbeatFailure(boolean retriable) { + if (!retriable) { + metricsManager.maybeRecordRebalanceFailed(); + } + // The leave group request is sent out once (not retried), so we should complete the leave + // operation once the request completes, regardless of the response. + if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) { + log.warn("Member {} with epoch {} received a failed response to the heartbeat to " + + "leave the group and completed the leave operation. ", memberId, memberEpoch); + } + } + + /** + * Complete the leave in progress (if any), if the member is UNSUBSCRIBED. This is expected to Review Comment: I think you may want to fix the Javadoc then, as it specifically mentions the member needs to be `UNSUBSCRIBED`. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -166,33 +167,13 @@ public void testMembershipManagerInitSupportsEmptyGroupInstanceId() { createMembershipManagerJoiningGroup(); } - @Test - public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() { Review Comment: Oops, that's on me I suppose ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -587,6 +617,14 @@ public void transitionToFatal() { return; } + if (previousState == MemberState.LEAVING || previousState == MemberState.PREPARE_LEAVING) { + log.info("Member {} with epoch {} was leaving the group with state {} when it got a " + + "fatal error from the broker. It will discard the ongoing leave and remain in {} " + + "state.", memberId, memberEpoch, previousState, state); + maybeCompleteLeaveInProgress(); Review Comment: nit: I think `state` will always be `MemberState.FATAL` here, so I would use it directly (as in the log messages above) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org