lianetm commented on code in PR #14413:
URL: https://github.com/apache/kafka/pull/14413#discussion_r1361289406


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() {
         return state.equals(MemberState.STABLE);
     }
 
+    /**
+     * Take new target assignment received from the server and set it as 
targetAssignment to be
+     * processed. Following the consumer group protocol, the server won't send 
a new target
+     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
+     * if a target assignment already exists.
+     *
+     * @throws IllegalStateException If a target assignment already exists.
+     */
     private void 
setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment 
newTargetAssignment) {
         if (!targetAssignment.isPresent()) {
+            log.debug("Member {} accepted new target assignment {} to 
reconcile", memberId, newTargetAssignment);
             targetAssignment = Optional.of(newTargetAssignment);
         } else {
-            // Keep the latest next target assignment
-            nextTargetAssignment = Optional.of(newTargetAssignment);
+            transitionToFailed();
+            throw new IllegalStateException("A target assignment pending to be 
reconciled already" +

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to