lianetm commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1361289406
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { - // Keep the latest next target assignment - nextTargetAssignment = Optional.of(newTargetAssignment); + transitionToFailed(); + throw new IllegalStateException("A target assignment pending to be reconciled already" + Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org