lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361404673


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
         return state.equals(MemberState.STABLE);
     }
 
+    /**
+     * Take new target assignment received from the server and set it as 
targetAssignment to be
+     * processed. Following the consumer group protocol, the server won't send 
a new target
+     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+     * if a target assignment already exists.
+     *
+     * @throws IllegalStateException If a target assignment already exists.
+     */
     private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
         if (!targetAssignment.isPresent()) {
+            log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
             targetAssignment = Optional.of(newTargetAssignment);
         } else {
-            // Keep the latest next target assignment
-            nextTargetAssignment = Optional.of(newTargetAssignment);
+            transitionToFailed();
+            throw new IllegalStateException("A target assignment pending to be 
reconciled already" +
+                    " exists.");
         }
     }
 
-    private boolean hasPendingTargetAssignment() {
-        return targetAssignment.isPresent() || 
nextTargetAssignment.isPresent();
-    }
-
-
-    /**
-     * Update state and assignment as the member has successfully processed a 
new target
-     * assignment.
-     * This indicates the end of the reconciliation phase for the member, and 
makes the target
-     * assignment the new current assignment.
-     *
-     * @param assignment Target assignment the member was able to successfully 
process
-     */
-    public void 
onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment 
assignment) {
-        updateAssignment(assignment);
-        transitionTo(MemberState.STABLE);
-    }
-
     /**
-     * Update state and member info as the member was not able to process the 
assignment, due to
-     * errors in the execution of the user-provided callbacks.
-     *
-     * @param error Exception found during the execution of the user-provided 
callbacks
+     * Returns true if the member has a target assignment being processed.
      */
-    public void onAssignmentProcessFailure(Throwable error) {
-        transitionTo(MemberState.FAILED);
-        // TODO: update member info appropriately, to clear up whatever 
shouldn't be kept in
-        //  this unrecoverable state
+    private boolean hasPendingTargetAssignment() {
+        return targetAssignment.isPresent();
     }
 
     private void resetEpoch() {
         this.memberEpoch = 0;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public MemberState state() {
         return state;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
     public AssignorSelection assignorSelection() {
         return this.assignorSelection;
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public ConsumerGroupHeartbeatResponseData.Assignment assignment() {
+    public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() {
         return this.currentAssignment;
     }
 
+
+    /**
+     * Assignment that the member received from the server but hasn't 
completely processed yet.
+     */
     // VisibleForTesting
     Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() 
{
         return targetAssignment;
     }
 
-    // VisibleForTesting
-    Optional<ConsumerGroupHeartbeatResponseData.Assignment> 
nextTargetAssignment() {
-        return nextTargetAssignment;
-    }
-
     /**
-     * Set the current assignment for the member. This indicates that the 
reconciliation of the
-     * target assignment has been successfully completed.
-     * This will clear the {@link #targetAssignment}, and take on the
-     * {@link #nextTargetAssignment} if any.
+     * This indicates that the reconciliation of the target assignment has 
been successfully

Review Comment:
   When and how this logic is invoked is changing in the [reconciler PR 
](https://github.com/apache/kafka/pull/14357) after our sync for reviewing how 
the interactions between HB-reconciler-manager, so I would suggest we wait 
until we settle on that PR and adjust there according to the changes, makes 
sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to