lianetm commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1361404673
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { - // Keep the latest next target assignment - nextTargetAssignment = Optional.of(newTargetAssignment); + transitionToFailed(); + throw new IllegalStateException("A target assignment pending to be reconciled already" + + " exists."); } } - private boolean hasPendingTargetAssignment() { - return targetAssignment.isPresent() || nextTargetAssignment.isPresent(); - } - - - /** - * Update state and assignment as the member has successfully processed a new target - * assignment. - * This indicates the end of the reconciliation phase for the member, and makes the target - * assignment the new current assignment. - * - * @param assignment Target assignment the member was able to successfully process - */ - public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - updateAssignment(assignment); - transitionTo(MemberState.STABLE); - } - /** - * Update state and member info as the member was not able to process the assignment, due to - * errors in the execution of the user-provided callbacks. - * - * @param error Exception found during the execution of the user-provided callbacks + * Returns true if the member has a target assignment being processed. */ - public void onAssignmentProcessFailure(Throwable error) { - transitionTo(MemberState.FAILED); - // TODO: update member info appropriately, to clear up whatever shouldn't be kept in - // this unrecoverable state + private boolean hasPendingTargetAssignment() { + return targetAssignment.isPresent(); } private void resetEpoch() { this.memberEpoch = 0; } + /** + * {@inheritDoc} + */ @Override public MemberState state() { return state; } + /** + * {@inheritDoc} + */ @Override public AssignorSelection assignorSelection() { return this.assignorSelection; } + /** + * {@inheritDoc} + */ @Override - public ConsumerGroupHeartbeatResponseData.Assignment assignment() { + public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { return this.currentAssignment; } + + /** + * Assignment that the member received from the server but hasn't completely processed yet. + */ // VisibleForTesting Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() { return targetAssignment; } - // VisibleForTesting - Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment() { - return nextTargetAssignment; - } - /** - * Set the current assignment for the member. This indicates that the reconciliation of the - * target assignment has been successfully completed. - * This will clear the {@link #targetAssignment}, and take on the - * {@link #nextTargetAssignment} if any. + * This indicates that the reconciliation of the target assignment has been successfully Review Comment: When and how this logic is invoked is changing in the [reconciler PR ](https://github.com/apache/kafka/pull/14357) after our sync for reviewing how the interactions between HB-reconciler-manager, so I would suggest we wait until we settle on that PR and adjust there according to the changes, makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org