lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1396006392
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); + } + + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + // TODO: If the member is already part of the group, this should only ensure that the + // updated subscription is included in the next heartbeat request. } + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + registerForMetadataUpdates(); + } + + /** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ + private void registerForMetadataUpdates() { + if (!isRegisteredForMetadataUpdates) { + this.metadata.addClusterUpdateListener(this); + isRegisteredForMetadataUpdates = true; + } + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public CompletableFuture<Void> leaveGroup() { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + // Member is not part of the group. No-op and return completed future to avoid + // unnecessary transitions. + return CompletableFuture.completedFuture(null); + } + + if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { + // Member already leaving. No-op and return existing leave group future that will + // complete when the ongoing leave operation completes. + return leaveGroupInProgress.get(); + } + + transitionTo(MemberState.PREPARE_LEAVING); + leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + // Clear the subscription, no matter if the callback execution failed or succeeded. + subscriptions.assignFromSubscribed(Collections.emptySet()); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); Review Comment: I moved the clear cache close to the `assignedFromSubscribed` in this manager, as a first step to align both and have a consistent usage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org