lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361271426


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
         return state.equals(MemberState.STABLE);
     }
 
+    /**
+     * Take new target assignment received from the server and set it as 
targetAssignment to be
+     * processed. Following the consumer group protocol, the server won't send 
a new target
+     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+     * if a target assignment already exists.
+     *
+     * @throws IllegalStateException If a target assignment already exists.
+     */
     private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
         if (!targetAssignment.isPresent()) {
+            log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
             targetAssignment = Optional.of(newTargetAssignment);
         } else {
-            // Keep the latest next target assignment
-            nextTargetAssignment = Optional.of(newTargetAssignment);
+            transitionToFailed();
+            throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+                    " exists.");
         }
     }
 
-    private boolean hasPendingTargetAssignment() {
-        return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-    }
-
-
-    /**
-     * Update state and assignment as the member has successfully processed a 
new target
-     * assignment.
-     * This indicates the end of the reconciliation phase for the member, and 
makes the target
-     * assignment the new current assignment.
-     *
-     * @param assignment Target assignment the member was able to successfully 
process
-     */
-    public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-        updateAssignment(assignment);
-        transitionTo(MemberState.STABLE);
-    }
-
     /**
-     * Update state and member info as the member was not able to process the 
assignment, due to
-     * errors in the execution of the user-provided callbacks.
-     *
-     * @param error Exception found during the execution of the user-provided 
callbacks
+     * Returns true if the member has a target assignment being processed.
      */
-    public void onAssignmentProcessFailure(Throwable error) {
-        transitionTo(MemberState.FAILED);
-        // TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-        //  this unrecoverable state
+    private boolean hasPendingTargetAssignment() {
+        return targetAssignment.isPresent();
     }
 
     private void resetEpoch() {
         this.memberEpoch = 0;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public MemberState state() {
         return state;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public AssignorSelection assignorSelection() {
         return this.assignorSelection;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+    public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
         return this.currentAssignment;
     }
 
+
+    /**
+     * Assignment that the member received from the server but hasn't 
completely processed yet.
+     */
     // VisibleForTesting
     Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() 
{
         return targetAssignment;
     }
 
-    // VisibleForTesting
-    Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
nextTargetAssignment() {
-        return nextTargetAssignment;
-    }
-
     /**
-     * Set the current assignment for the member. This indicates that the 
reconciliation of the
-     * target assignment has been successfully completed.
-     * This will clear the {@link #targetAssignment}, and take on the
-     * {@link #nextTargetAssignment} if any.
+     * This indicates that the reconciliation of the target assignment has 
been successfully
+     * completed, so it will make it effective by assigning it to the current 
assignment.
      *
-     * @param assignment Assignment that has been successfully processed as 
part of the
-     *                   reconciliation process.
+     * @params Assignment that has been successfully reconciled. This is 
expected to
+     * match the target assignment defined in {@link #targetAssignment()}
      */
     @Override
-    public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-        this.currentAssignment = assignment;
-        if (!nextTargetAssignment.isPresent()) {
-            targetAssignment = Optional.empty();
-        } else {
-            targetAssignment = Optional.of(nextTargetAssignment.get());
-            nextTargetAssignment = Optional.empty();
+    public void 
onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
+        if (assignment == null) {
+            throw new IllegalArgumentException("Assignment cannot be null");
         }
+        if (!assignment.equals(targetAssignment.orElse(null))) {
+            // This could be simplified to remove the assignment param and 
just assume that what
+            // was reconciled was the targetAssignment, but keeping it 
explicit and failing fast
+            // here to uncover any issues in the interaction of the assignment 
processing logic
+            // and this.
+            throw new IllegalStateException(String.format("Reconciled 
assignment %s does not " +
+                            "match the initial target assignment %s", 
assignment, targetAssignment.orElse(null)));
+        }
+        this.currentAssignment = assignment;
+        targetAssignment = Optional.empty();
         maybeTransitionToStable();

Review Comment:
   Agree that we can directly transition to stable here. Done.
   
   As for triggering the HB when needed (outside of interval), @kirktrue , 
@philipnee and I just synced and are considering to base it on something like 
the existing `shouldHeartbeatNow` as you pointed out, with the revised approach 
of having the reconciliation logic triggered from within the 
`membershipManager` transition to reconciling, and updating it when the 
transition completes. 
   
   The [reconciler PR](https://github.com/apache/kafka/pull/14357) will propose 
the approach for better aligning the components and interactions, but this is 
what we agreed on:
    
   - HB manager using the membershipManager to trigger transitions, and to 
check if it shouldSendHeartbeatNow, that's it.  
   - MembershipManager triggering the reconciliation logic and updating the 
state to indicate it shouldSendHeartbeatNow accordingly when the reconciliation 
completes.
   - Reconciler only responsible for determine what to reconcile (diff between 
subscription and new assignment) & invoke call backs
   
   These are rough ideas we discussed, it will all be part of the 
[reconciliation PR](https://github.com/apache/kafka/pull/14357) so we can 
follow-up the changes/conversation there.
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to