lianetm commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1361271426
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { - // Keep the latest next target assignment - nextTargetAssignment = Optional.of(newTargetAssignment); + transitionToFailed(); + throw new IllegalStateException("A target assignment pending to be reconciled already" + + " exists."); } } - private boolean hasPendingTargetAssignment() { - return targetAssignment.isPresent() || nextTargetAssignment.isPresent(); - } - - - /** - * Update state and assignment as the member has successfully processed a new target - * assignment. - * This indicates the end of the reconciliation phase for the member, and makes the target - * assignment the new current assignment. - * - * @param assignment Target assignment the member was able to successfully process - */ - public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - updateAssignment(assignment); - transitionTo(MemberState.STABLE); - } - /** - * Update state and member info as the member was not able to process the assignment, due to - * errors in the execution of the user-provided callbacks. - * - * @param error Exception found during the execution of the user-provided callbacks + * Returns true if the member has a target assignment being processed. */ - public void onAssignmentProcessFailure(Throwable error) { - transitionTo(MemberState.FAILED); - // TODO: update member info appropriately, to clear up whatever shouldn't be kept in - // this unrecoverable state + private boolean hasPendingTargetAssignment() { + return targetAssignment.isPresent(); } private void resetEpoch() { this.memberEpoch = 0; } + /** + * {@inheritDoc} + */ @Override public MemberState state() { return state; } + /** + * {@inheritDoc} + */ @Override public AssignorSelection assignorSelection() { return this.assignorSelection; } + /** + * {@inheritDoc} + */ @Override - public ConsumerGroupHeartbeatResponseData.Assignment assignment() { + public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { return this.currentAssignment; } + + /** + * Assignment that the member received from the server but hasn't completely processed yet. + */ // VisibleForTesting Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() { return targetAssignment; } - // VisibleForTesting - Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment() { - return nextTargetAssignment; - } - /** - * Set the current assignment for the member. This indicates that the reconciliation of the - * target assignment has been successfully completed. - * This will clear the {@link #targetAssignment}, and take on the - * {@link #nextTargetAssignment} if any. + * This indicates that the reconciliation of the target assignment has been successfully + * completed, so it will make it effective by assigning it to the current assignment. * - * @param assignment Assignment that has been successfully processed as part of the - * reconciliation process. + * @params Assignment that has been successfully reconciled. This is expected to + * match the target assignment defined in {@link #targetAssignment()} */ @Override - public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - this.currentAssignment = assignment; - if (!nextTargetAssignment.isPresent()) { - targetAssignment = Optional.empty(); - } else { - targetAssignment = Optional.of(nextTargetAssignment.get()); - nextTargetAssignment = Optional.empty(); + public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) { + if (assignment == null) { + throw new IllegalArgumentException("Assignment cannot be null"); } + if (!assignment.equals(targetAssignment.orElse(null))) { + // This could be simplified to remove the assignment param and just assume that what + // was reconciled was the targetAssignment, but keeping it explicit and failing fast + // here to uncover any issues in the interaction of the assignment processing logic + // and this. + throw new IllegalStateException(String.format("Reconciled assignment %s does not " + + "match the initial target assignment %s", assignment, targetAssignment.orElse(null))); + } + this.currentAssignment = assignment; + targetAssignment = Optional.empty(); maybeTransitionToStable(); Review Comment: Agree that we can directly transition to stable here. Done. As for triggering the HB when needed (outside of interval), @kirktrue , @philipnee and I just synced and are considering to base it on the existing `shouldHeartbeatNow` as you pointed out, with the revised approach of having the reconciliation logic triggered from within the `membershipManager` transition to reconciling, and updating it when the transition completes. The [reconciler PR](https://github.com/apache/kafka/pull/14357) will propose the approach for better aligning the components and interactions, but this is what we agreed on: - HB manager using the membershipManager to trigger transitions, and to check if it `shouldSendHeartbeatNow`, that's it. - MembershipManager triggering the reconciliation logic and updating the `shouldSendHeartbeatNow` accordingly when the reconciliation completes. - Reconciler only responsible for determine what to reconcile (diff between subscription and new assignment) & invoke call backs This is all part of the [reconciliation PR](https://github.com/apache/kafka/pull/14357) so we can follow-up the changes/conversation there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org