dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394032677
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); } + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public void transitionToJoining() { + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + // Reset member ID of the reconciliation in progress (if any), to make sure that if the + // reconciliation completes while the member is rejoining but hasn't received the new + // member ID yet, the reconciliation result is discarded. + memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ - private boolean maybeTransitionToStable() { - if (!hasPendingTargetAssignment()) { - transitionTo(MemberState.STABLE); + @Override + public CompletableFuture<Void> leaveGroup() { + transitionTo(MemberState.PREPARE_LEAVING); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + + // Clear the subscription, no matter if the callback execution failed or succeeded. + subscriptions.assignFromSubscribed(Collections.emptySet()); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + + }); + + clearPendingAssignmentsAndLocalNamesCache(); + + // Return callback future to indicate that the leave group is done when the callbacks + // complete, without waiting for the heartbeat to be sent out. (Best effort to send it + // but do not hold the leave group operation for it) + return callbackResult; + } + + /** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * <ul> + * <li>If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe)</li> + * + * <li>If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced .</li> + * </ul> + * + * @return Future that will complete when the callback execution completes. + */ + private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<Void> callbackResult; + if (droppedPartitions.isEmpty()) { + // No assignment to release + callbackResult = CompletableFuture.completedFuture(null); } else { - transitionTo(MemberState.RECONCILING); + // Release assignment + if (memberEpoch > 0) { + // Member is part of the group. Invoke onPartitionsRevoked. + callbackResult = revokePartitions(droppedPartitions); + } else { + // Member is not part of the group anymore. Invoke onPartitionsLost. + callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); + } + // Remove all topic IDs and names from local cache + callbackResult.whenComplete((result, error) -> clearPendingAssignmentsAndLocalNamesCache()); + } + return callbackResult; + } + + /** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#LEAVING} state so that a heartbeat + * request is sent out with it. + */ + private void transitionToSendingLeaveGroup() { + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + currentAssignment = new HashSet<>(); + transitionTo(MemberState.LEAVING); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean shouldHeartbeatNow() { + MemberState state = state(); + return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING; + } + + /** + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSent() { + MemberState state = state(); + if (state == MemberState.ACKNOWLEDGING) { + if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); + } else { + transitionTo(MemberState.RECONCILING); + } + } else if (state == MemberState.LEAVING) { + transitionTo(MemberState.UNSUBSCRIBED); } - return state.equals(MemberState.STABLE); } /** - * Take new target assignment received from the server and set it as targetAssignment to be - * processed. Following the consumer group protocol, the server won't send a new target - * member while a previous one hasn't been acknowledged by the member, so this will fail - * if a target assignment already exists. + * @return True if there are no assignments waiting to be resolved from metadata or reconciled. + */ + private boolean allPendingAssignmentsReconciled() { + return assignmentUnresolved.isEmpty() && assignmentReadyToReconcile.isEmpty(); + } + + @Override + public boolean shouldSkipHeartbeat() { + MemberState state = state(); + return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; + } + + /** + * Reconcile the assignment that has been received from the server and for which topic names + * are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed, + * trigger the callbacks and update the subscription state. Note that only one reconciliation + * can be in progress at a time. If there is already another one in progress when this is + * triggered, it will be no-op, and the assignment will be reconciled on the next + * reconciliation loop. + */ + boolean reconcile() { + // Make copy of the assignment to reconcile as it could change as new assignments or metadata updates are received + SortedSet<TopicPartition> assignedPartitions = new TreeSet<>(COMPARATOR); + assignedPartitions.addAll(assignmentReadyToReconcile); + + SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR); + ownedPartitions.addAll(subscriptions.assignedPartitions()); + + boolean sameAssignmentReceived = assignedPartitions.equals(ownedPartitions); + + if (reconciliationInProgress || sameAssignmentReceived) { + String reason; + if (reconciliationInProgress) { + reason = "Another reconciliation is already in progress. Assignment " + + assignmentReadyToReconcile + " will be handled in the next reconciliation loop."; + } else { + reason = "Target assignment ready to reconcile is equals to the member current assignment."; + } + log.debug("Ignoring reconciliation attempt. " + reason); + return false; + } + + markReconciliationInProgress(); + + + // Partitions to assign (not previously owned) + SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR); + addedPartitions.addAll(assignedPartitions); + addedPartitions.removeAll(ownedPartitions); + + // Partitions to revoke + SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR); + revokedPartitions.addAll(ownedPartitions); + revokedPartitions.removeAll(assignedPartitions); + + log.info("Updating assignment with\n" + + "\tAssigned partitions: {}\n" + + "\tCurrent owned partitions: {}\n" + + "\tAdded partitions (assigned - owned): {}\n" + + "\tRevoked partitions (owned - assigned): {}\n", + assignedPartitions, + ownedPartitions, + addedPartitions, + revokedPartitions + ); + + CompletableFuture<Void> revocationResult; + if (!revokedPartitions.isEmpty()) { + revocationResult = revokePartitions(revokedPartitions); + } else { + revocationResult = CompletableFuture.completedFuture(null); + // Reschedule the auto commit starting from now (new assignment received without any + // revocation). + commitRequestManager.resetAutoCommitTimer(); + } + + // Future that will complete when the full reconciliation process completes (revocation + // and assignment, executed sequentially) + CompletableFuture<Void> reconciliationResult = + revocationResult.thenCompose(__ -> { + boolean memberHasRejoined = !Objects.equals(memberIdOnReconciliationStart, + memberId); + if (state == MemberState.RECONCILING && !memberHasRejoined) { + + // Apply assignment + CompletableFuture<Void> assignResult = assignPartitions(assignedPartitions, addedPartitions); Review Comment: btw, it seems that we could have transitioned to another state while waiting on this one as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org