lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1408354526


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -168,76 +329,686 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
         this.memberId = response.memberId();
         this.memberEpoch = response.memberEpoch();
         ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+
         if (assignment != null) {
-            setTargetAssignment(assignment);
+            transitionTo(MemberState.RECONCILING);
+            replaceUnresolvedAssignmentWithNewAssignment(assignment);
+            resolveMetadataForUnresolvedAssignment();
+            reconcile();
+        } else if (allPendingAssignmentsReconciled()) {
+            transitionTo(MemberState.STABLE);
         }
-        maybeTransitionToStable();
+    }
+
+    /**
+     * Overwrite collection of unresolved topic Ids with the new target 
assignment. This will
+     * effectively achieve the following:
+     *
+     *    - all topics received in assignment will try to be resolved to find 
their topic names
+     *
+     *    - any topic received in a previous assignment that was still 
unresolved, and that is
+     *    not included in the assignment anymore, will be removed from the 
unresolved collection.
+     *    This should be the case when a topic is sent in an assignment, 
deleted right after, and
+     *    removed from the assignment the next time a broker sends one to the 
member.
+     *
+     * @param assignment Target assignment received from the broker.
+     */
+    private void replaceUnresolvedAssignmentWithNewAssignment(
+            ConsumerGroupHeartbeatResponseData.Assignment assignment) {
+        assignmentUnresolved.clear();
+        assignment.topicPartitions().forEach(topicPartitions ->
+                assignmentUnresolved.put(topicPartitions.topicId(), 
topicPartitions.partitions()));
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
     public void transitionToFenced() {
-        resetEpoch();
         transitionTo(MemberState.FENCED);
+        resetEpoch();
+        log.debug("Member {} with epoch {} transitioned to {} state. It will 
release its " +
+                "assignment and rejoin the group.", memberId, memberEpoch, 
MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        " after member got fenced. Member will rejoin the 
group anyways.", error);
+            }
+            updateSubscription(Collections.emptySet(), true);
+            transitionToJoining();
+        });
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        transitionTo(MemberState.FATAL);
+        log.error("Member {} with epoch {} transitioned to {} state", 
memberId, memberEpoch, MemberState.FATAL);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+            updateSubscription(Collections.emptySet(), true);
+        });
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    public void onSubscriptionUpdated() {
+        if (state == MemberState.UNSUBSCRIBED) {
+            transitionToJoining();
+        }
+    }
+
+    /**
+     * Update a new assignment by setting the assigned partitions in the 
member subscription.
+     *
+     * @param assignedPartitions Topic partitions to take as the new 
subscription assignment
+     * @param clearAssignments True if the
+     */
+    private void updateSubscription(Collection<TopicPartition> 
assignedPartitions,
+                                    boolean clearAssignments) {
+        subscriptions.assignFromSubscribed(assignedPartitions);
+        if (clearAssignments) {
+            clearPendingAssignmentsAndLocalNamesCache();
+        }
+    }
+
+    /**
+     * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+     * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+     * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+     * Visible for testing.
+     */
+    void transitionToJoining() {
+        if (state == MemberState.FATAL) {
+            log.warn("No action taken to join the group with the updated 
subscription because " +
+                    "the member is in FATAL state");
+            return;
+        }
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        registerForMetadataUpdates();
+    }
+
+    /**
+     * Register to get notified when the cluster metadata is updated, via the
+     * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+     */
+    private void registerForMetadataUpdates() {
+        if (!isRegisteredForMetadataUpdates) {
+            this.metadata.addClusterUpdateListener(this);
+            isRegisteredForMetadataUpdates = true;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public CompletableFuture<Void> leaveGroup() {
+        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+            // Member is not part of the group. No-op and return completed 
future to avoid
+            // unnecessary transitions.
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+            // Member already leaving. No-op and return existing leave group 
future that will
+            // complete when the ongoing leave operation completes.
+            return leaveGroupInProgress.get();
+        }
+
+        transitionTo(MemberState.PREPARE_LEAVING);
+        CompletableFuture<Void> leaveResult = new CompletableFuture<>();
+        leaveGroupInProgress = Optional.of(leaveResult);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            updateSubscription(Collections.emptySet(), true);
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+        });
+
+        // Return future to indicate that the leave group is done when the 
callbacks
+        // complete, and the transition to send the heartbeat has been made.
+        return leaveResult;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                log.debug("Member {} with epoch {} transitioned to {} after a 
heartbeat was sent " +
+                        "to ack a previous reconciliation. New assignments are 
ready to " +
+                        "be reconciled.", memberId, memberEpoch, 
MemberState.RECONCILING);
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionToUnsubscribed();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSkipped() {
+        if (state == MemberState.LEAVING) {
+            log.debug("Heartbeat for leaving group could not be sent. Member 
{} with epoch {} will transition to {}.",
+                    memberId, memberEpoch, MemberState.UNSUBSCRIBED);
+            transitionToUnsubscribed();
+        }
+    }
+
+    private void transitionToUnsubscribed() {
+        transitionTo(MemberState.UNSUBSCRIBED);
+        leaveGroupInProgress.get().complete(null);
+        leaveGroupInProgress = Optional.empty();
+    }
+
+    /**
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicIdPartition> assignedTopicIdPartitions = new 
TreeSet<>(TOPIC_ID_PARTITION_COMPARATOR);
+        assignedTopicIdPartitions.addAll(assignmentReadyToReconcile);
+
+        SortedSet<TopicPartition> ownedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        // Keep copy of assigned TopicPartitions created from the 
TopicIdPartitions that are
+        // being reconciled. Needed for interactions with the centralized 
subscription state that
+        // does not support topic IDs yet, and for the callbacks.
+        SortedSet<TopicPartition> assignedTopicPartition = 
toTopicPartitionSet(assignedTopicIdPartitions);
+
+        // Check same assignment. Based on topic names for now, until topic 
IDs are properly
+        // supported in the centralized subscription state object.
+        boolean sameAssignmentReceived = 
assignedTopicPartition.equals(ownedPartitions);
+
+        if (reconciliationInProgress || sameAssignmentReceived) {
+            String reason;
+            if (reconciliationInProgress) {
+                reason = "Another reconciliation is already in progress. 
Assignment " +
+                        assignmentReadyToReconcile + " will be handled in the 
next reconciliation loop.";
+            } else {
+                reason = "Target assignment ready to reconcile is equals to 
the member current assignment.";
+            }
+            log.debug("Ignoring reconciliation attempt. " + reason);
+            return false;
+        }
+
+        markReconciliationInProgress();
+
+        // Partitions to assign (not previously owned)
+        SortedSet<TopicPartition> addedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        addedPartitions.addAll(assignedTopicPartition);
+        addedPartitions.removeAll(ownedPartitions);
+
+        // Partitions to revoke
+        SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        revokedPartitions.addAll(ownedPartitions);
+        revokedPartitions.removeAll(assignedTopicPartition);
+
+        log.info("Updating assignment with\n" +
+                        "\tAssigned partitions:                       {}\n" +
+                        "\tCurrent owned partitions:                  {}\n" +
+                        "\tAdded partitions (assigned - owned):       {}\n" +
+                        "\tRevoked partitions (owned - assigned):     {}\n",
+                assignedTopicIdPartitions,
+                ownedPartitions,
+                addedPartitions,
+                revokedPartitions
+        );
+
+        CompletableFuture<Void> revocationResult;
+        if (!revokedPartitions.isEmpty()) {
+            revocationResult = revokePartitions(revokedPartitions);
+        } else {
+            revocationResult = CompletableFuture.completedFuture(null);
+            // Reschedule the auto commit starting from now (new assignment 
received without any
+            // revocation).
+            commitRequestManager.resetAutoCommitTimer();
+        }
+
+        // Future that will complete when the full reconciliation process 
completes (revocation
+        // and assignment, executed sequentially)
+        CompletableFuture<Void> reconciliationResult =
+                revocationResult.thenCompose(__ -> {
+                    boolean memberHasRejoined = 
memberEpochOnReconciliationStart != memberEpoch;
+                    if (state == MemberState.RECONCILING && 
!memberHasRejoined) {
+                        // Apply assignment
+                        CompletableFuture<Void> assignResult = 
assignPartitions(assignedTopicPartition,
+                                addedPartitions);
+
+                        // Clear topic names cache only for topics that are 
not in the subscription anymore
+                        for (TopicPartition tp : revokedPartitions) {
+                            if 
(!subscriptions.subscription().contains(tp.topic())) {
+                                
assignedTopicNamesCache.values().remove(tp.topic());
+                            }
+                        }

Review Comment:
   Totally, and actually it made me realize it could be further simplified by 
retaining the assigned. It is included now in the [follow-up 
PR](https://github.com/apache/kafka/pull/14857) with the other minor fixes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to