lianetm commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1396474166


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +348,560 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.debug("OnPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public void transitionToJoining() {
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        // Reset member ID of the reconciliation in progress (if any), to make 
sure that if the
+        // reconciliation completes while the member is rejoining but hasn't 
received the new
+        // member ID yet, the reconciliation result is discarded.
+        memberIdOnReconciliationStart = null;
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * {@inheritDoc}
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    @Override
+    public CompletableFuture<Void> leaveGroup() {
+        transitionTo(MemberState.PREPARE_LEAVING);
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return callback future to indicate that the leave group is done 
when the callbacks
+        // complete, without waiting for the heartbeat to be sent out. (Best 
effort to send it
+        // but do not hold the leave group operation for it)
+        return callbackResult;
+    }
+
+    /**
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
+     */
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());
+        }
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionTo(MemberState.UNSUBSCRIBED);
         }
-        return state.equals(MemberState.STABLE);
     }
 
     /**
-     * Take new target assignment received from the server and set it as 
targetAssignment to be
-     * processed. Following the consumer group protocol, the server won't send 
a new target
-     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
-     * if a target assignment already exists.
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(COMPARATOR);
+        assignedPartitions.addAll(assignmentReadyToReconcile);
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        boolean sameAssignmentReceived = 
assignedPartitions.equals(ownedPartitions);
+
+        if (reconciliationInProgress || sameAssignmentReceived) {
+            String reason;
+            if (reconciliationInProgress) {
+                reason = "Another reconciliation is already in progress. 
Assignment " +
+                        assignmentReadyToReconcile + " will be handled in the 
next reconciliation loop.";
+            } else {
+                reason = "Target assignment ready to reconcile is equals to 
the member current assignment.";
+            }
+            log.debug("Ignoring reconciliation attempt. " + reason);
+            return false;
+        }
+
+        markReconciliationInProgress();
+
+
+        // Partitions to assign (not previously owned)
+        SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR);
+        addedPartitions.addAll(assignedPartitions);
+        addedPartitions.removeAll(ownedPartitions);
+
+        // Partitions to revoke
+        SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+        revokedPartitions.addAll(ownedPartitions);
+        revokedPartitions.removeAll(assignedPartitions);
+
+        log.info("Updating assignment with\n" +
+                        "\tAssigned partitions:                       {}\n" +
+                        "\tCurrent owned partitions:                  {}\n" +
+                        "\tAdded partitions (assigned - owned):       {}\n" +
+                        "\tRevoked partitions (owned - assigned):     {}\n",
+                assignedPartitions,
+                ownedPartitions,
+                addedPartitions,
+                revokedPartitions
+        );
+
+        CompletableFuture<Void> revocationResult;
+        if (!revokedPartitions.isEmpty()) {
+            revocationResult = revokePartitions(revokedPartitions);
+        } else {
+            revocationResult = CompletableFuture.completedFuture(null);
+            // Reschedule the auto commit starting from now (new assignment 
received without any
+            // revocation).
+            commitRequestManager.resetAutoCommitTimer();
+        }
+
+        // Future that will complete when the full reconciliation process 
completes (revocation
+        // and assignment, executed sequentially)
+        CompletableFuture<Void> reconciliationResult =
+                revocationResult.thenCompose(__ -> {
+                    boolean memberHasRejoined = 
!Objects.equals(memberIdOnReconciliationStart,
+                            memberId);
+                    if (state == MemberState.RECONCILING && 
!memberHasRejoined) {
+
+                        // Apply assignment
+                        CompletableFuture<Void> assignResult = 
assignPartitions(assignedPartitions, addedPartitions);
+
+                        // Clear topic names cache only for topics that are 
not in the subscription anymore
+                        for (TopicPartition tp : revokedPartitions) {
+                            if 
(!subscriptions.subscription().contains(tp.topic())) {
+                                
assignedTopicNamesCache.values().remove(tp.topic());
+                            }
+                        }
+
+                        return assignResult;
+
+                    } else {
+                        String reason;
+                        if (state != MemberState.RECONCILING) {
+                            reason = "The member already transitioned out of 
the reconciling " +
+                                    "state into " + state;
+                        } else {
+                            reason = "The member has re-joined the group.";
+                        }
+                        // Revocation callback completed but the reconciled 
assignment should not
+                        // be applied (not relevant anymore). This could be 
because the member
+                        // is not in the RECONCILING state anymore (fenced, 
failed, unsubscribed),
+                        // or because it has already re-joined the group.
+                        CompletableFuture<Void> res = new 
CompletableFuture<>();
+                        res.completeExceptionally(new 
KafkaException("Interrupting reconciliation" +
+                                " after revocation. " + reason));
+                        return res;
+                    }
+                });
+
+        reconciliationResult.whenComplete((result, error) -> {
+            markReconciliationCompleted();
+            if (error != null) {
+                // Leaving member in RECONCILING state after callbacks fail. 
The member
+                // won't send the ack, and the expectation is that the broker 
will kick the
+                // member out of the group after the rebalance timeout 
expires, leading to a
+                // RECONCILING -> FENCED transition.
+                log.error("Reconciliation failed. ", error);
+            } else {
+                if (state == MemberState.RECONCILING) {
+
+                    // Make assignment effective on the broker by 
transitioning to send acknowledge.
+                    transitionTo(MemberState.ACKNOWLEDGING);
+
+                    // Make assignment effective on the member group manager
+                    currentAssignment = assignedPartitions;
+
+                    // Indicate that we completed reconciling a subset of the 
assignment ready to
+                    // reconcile (new assignments might have been received or 
discovered in
+                    // metadata)
+                    assignmentReadyToReconcile.removeAll(assignedPartitions);
+
+                } else {
+                    log.debug("New assignment processing completed but the 
member already " +
+                            "transitioned out of the reconciliation state into 
{}. Interrupting " +
+                            "reconciliation as it's not relevant anymore,", 
state);
+                    // TODO: double check if subscription state changes 
needed. This is expected to be
+                    //  the case where the member got fenced, failed or 
unsubscribed while the
+                    //  reconciliation was in process. Transitions to those 
states update the
+                    //  subscription state accordingly so it shouldn't be 
necessary to make any changes
+                    //  to the subscription state at this point.
+                }
+            }
+        });
+
+        return true;
+    }
+
+    /**
+     *  Visible for testing.
+     */
+    void markReconciliationInProgress() {
+        reconciliationInProgress = true;
+        memberIdOnReconciliationStart = memberId;
+    }
+
+    /**
+     *  Visible for testing.
+     */
+    void markReconciliationCompleted() {
+        reconciliationInProgress = false;
+    }
+
+    /**
+     * Build set of TopicPartition (topic name and partition id) from the 
target assignment
+     * received from the broker (topic IDs and list of partitions).
+     *
+     * <p>
+     * This will:
      *
-     * @throws IllegalStateException If a target assignment already exists.
+     * <ol type="1">
+     *     <li>Try to find topic names in the metadata cache</li>
+     *     <li>For topics not found in metadata, try to find names in the 
local topic names cache
+     *     (contains topic id and names currently assigned and resolved)</li>
+     *     <li>If there are topics that are not in metadata cache or in the 
local cached
+     *     of topic names assigned to this member, request a metadata update, 
and continue
+     *     resolving names as the cache is updated.
+     *     </li>
+     * </ol>
      */
-    private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
-        if (!targetAssignment.isPresent()) {
-            log.info("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
-            targetAssignment = Optional.of(newTargetAssignment);
+    private void resolveMetadataForUnresolvedAssignment() {
+
+        // Try to resolve topic names from metadata cache or subscription 
cache, and move
+        // assignments from the "unresolved" collection, to the 
"readyToReconcile" one.
+        Iterator<Map.Entry<Uuid, List<Integer>>> it = 
assignmentUnresolved.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<Uuid, List<Integer>> e = it.next();
+            Uuid topicId = e.getKey();
+            List<Integer> topicPartitions = e.getValue();
+
+            Optional<String> nameFromMetadata = 
findTopicNameInGlobalOrLocalCache(topicId);
+            if (nameFromMetadata.isPresent()) {
+                // Name resolved, so assignment is ready for reconciliation.
+                
assignmentReadyToReconcile.addAll(buildAssignedPartitionsWithTopicName(topicPartitions,
+                        nameFromMetadata.get()));
+                it.remove();
+            }
+        }
+
+        if (!assignmentUnresolved.isEmpty()) {
+            log.debug("Topic Ids {} received in target assignment were not 
found in metadata and " +
+                    "are not currently assigned. Requesting a metadata update 
now to resolve " +
+                    "topic names.", assignmentUnresolved.keySet());
+            metadata.requestUpdate(true);

Review Comment:
   Makes sense, [KAFKA-15847](https://issues.apache.org/jira/browse/KAFKA-15847)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to