dajac commented on code in PR #14413: URL: https://github.com/apache/kafka/pull/14413#discussion_r1360767846
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -19,46 +19,84 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; import java.util.Optional; /** - * Membership manager that maintains group membership for a single member following the new + * Membership manager that maintains group membership for a single member, following the new * consumer group protocol. * <p/> - * This keeps membership state and assignment updated in-memory, based on the heartbeat responses - * the member receives. It is also responsible for computing assignment for the group based on - * the metadata, if the member has been selected by the broker to do so. + * This is responsible for: + * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li> + * <li>Keeping member state as defined in {@link MemberState}.</li> + * <p/> + * Member info and state are updated based on the heartbeat responses the member receives. */ public class MembershipManagerImpl implements MembershipManager { + /** + * ID of the consumer group the member will be part of, provided when creating the current Review Comment: nit: Group ID... ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ########## @@ -21,53 +21,76 @@ import java.util.Optional; /** - * Manages group membership for a single member. + * Manages membership of a single member to a consumer group. + * <p/> * Responsible for: * <li>Keeping member state</li> * <li>Keeping assignment for the member</li> * <li>Computing assignment for the group if the member is required to do so<li/> */ public interface MembershipManager { + /** + * ID of the consumer group the member is part of (or wants to be part of). Review Comment: nit: `Group ID...`? Should we also prefix all those accessors with `@return`? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -196,4 +281,4 @@ private ConsumerGroupHeartbeatResponseData.Assignment createAssignment() { .setPartitions(Arrays.asList(3, 4, 5)) )); } -} +} Review Comment: nit: Let's add an empty line at the end of the file. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { - // Keep the latest next target assignment - nextTargetAssignment = Optional.of(newTargetAssignment); + transitionToFailed(); + throw new IllegalStateException("A target assignment pending to be reconciled already" + + " exists."); } } - private boolean hasPendingTargetAssignment() { - return targetAssignment.isPresent() || nextTargetAssignment.isPresent(); - } - - - /** - * Update state and assignment as the member has successfully processed a new target - * assignment. - * This indicates the end of the reconciliation phase for the member, and makes the target - * assignment the new current assignment. - * - * @param assignment Target assignment the member was able to successfully process - */ - public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - updateAssignment(assignment); - transitionTo(MemberState.STABLE); - } - /** - * Update state and member info as the member was not able to process the assignment, due to - * errors in the execution of the user-provided callbacks. - * - * @param error Exception found during the execution of the user-provided callbacks + * Returns true if the member has a target assignment being processed. */ - public void onAssignmentProcessFailure(Throwable error) { - transitionTo(MemberState.FAILED); - // TODO: update member info appropriately, to clear up whatever shouldn't be kept in - // this unrecoverable state + private boolean hasPendingTargetAssignment() { + return targetAssignment.isPresent(); } private void resetEpoch() { this.memberEpoch = 0; } + /** + * {@inheritDoc} + */ @Override public MemberState state() { return state; } + /** + * {@inheritDoc} + */ @Override public AssignorSelection assignorSelection() { return this.assignorSelection; } + /** + * {@inheritDoc} + */ @Override - public ConsumerGroupHeartbeatResponseData.Assignment assignment() { + public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { return this.currentAssignment; } + + /** + * Assignment that the member received from the server but hasn't completely processed yet. + */ // VisibleForTesting Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() { return targetAssignment; } - // VisibleForTesting - Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment() { - return nextTargetAssignment; - } - /** - * Set the current assignment for the member. This indicates that the reconciliation of the - * target assignment has been successfully completed. - * This will clear the {@link #targetAssignment}, and take on the - * {@link #nextTargetAssignment} if any. + * This indicates that the reconciliation of the target assignment has been successfully + * completed, so it will make it effective by assigning it to the current assignment. * - * @param assignment Assignment that has been successfully processed as part of the - * reconciliation process. + * @params Assignment that has been successfully reconciled. This is expected to + * match the target assignment defined in {@link #targetAssignment()} */ @Override - public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - this.currentAssignment = assignment; - if (!nextTargetAssignment.isPresent()) { - targetAssignment = Optional.empty(); - } else { - targetAssignment = Optional.of(nextTargetAssignment.get()); - nextTargetAssignment = Optional.empty(); + public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) { + if (assignment == null) { + throw new IllegalArgumentException("Assignment cannot be null"); } + if (!assignment.equals(targetAssignment.orElse(null))) { + // This could be simplified to remove the assignment param and just assume that what + // was reconciled was the targetAssignment, but keeping it explicit and failing fast + // here to uncover any issues in the interaction of the assignment processing logic + // and this. + throw new IllegalStateException(String.format("Reconciled assignment %s does not " + + "match the initial target assignment %s", assignment, targetAssignment.orElse(null))); Review Comment: nit: s/initial/expected? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { - // Keep the latest next target assignment - nextTargetAssignment = Optional.of(newTargetAssignment); + transitionToFailed(); + throw new IllegalStateException("A target assignment pending to be reconciled already" + + " exists."); } } - private boolean hasPendingTargetAssignment() { - return targetAssignment.isPresent() || nextTargetAssignment.isPresent(); - } - - - /** - * Update state and assignment as the member has successfully processed a new target - * assignment. - * This indicates the end of the reconciliation phase for the member, and makes the target - * assignment the new current assignment. - * - * @param assignment Target assignment the member was able to successfully process - */ - public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - updateAssignment(assignment); - transitionTo(MemberState.STABLE); - } - /** - * Update state and member info as the member was not able to process the assignment, due to - * errors in the execution of the user-provided callbacks. - * - * @param error Exception found during the execution of the user-provided callbacks + * Returns true if the member has a target assignment being processed. */ - public void onAssignmentProcessFailure(Throwable error) { - transitionTo(MemberState.FAILED); - // TODO: update member info appropriately, to clear up whatever shouldn't be kept in - // this unrecoverable state + private boolean hasPendingTargetAssignment() { + return targetAssignment.isPresent(); } private void resetEpoch() { this.memberEpoch = 0; } + /** + * {@inheritDoc} + */ @Override public MemberState state() { return state; } + /** + * {@inheritDoc} + */ @Override public AssignorSelection assignorSelection() { return this.assignorSelection; } + /** + * {@inheritDoc} + */ @Override - public ConsumerGroupHeartbeatResponseData.Assignment assignment() { + public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { return this.currentAssignment; } + + /** + * Assignment that the member received from the server but hasn't completely processed yet. + */ // VisibleForTesting Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment() { return targetAssignment; } - // VisibleForTesting - Optional<ConsumerGroupHeartbeatResponseData.Assignment> nextTargetAssignment() { - return nextTargetAssignment; - } - /** - * Set the current assignment for the member. This indicates that the reconciliation of the - * target assignment has been successfully completed. - * This will clear the {@link #targetAssignment}, and take on the - * {@link #nextTargetAssignment} if any. + * This indicates that the reconciliation of the target assignment has been successfully + * completed, so it will make it effective by assigning it to the current assignment. * - * @param assignment Assignment that has been successfully processed as part of the - * reconciliation process. + * @params Assignment that has been successfully reconciled. This is expected to + * match the target assignment defined in {@link #targetAssignment()} */ @Override - public void updateAssignment(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - this.currentAssignment = assignment; - if (!nextTargetAssignment.isPresent()) { - targetAssignment = Optional.empty(); - } else { - targetAssignment = Optional.of(nextTargetAssignment.get()); - nextTargetAssignment = Optional.empty(); + public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) { + if (assignment == null) { + throw new IllegalArgumentException("Assignment cannot be null"); } + if (!assignment.equals(targetAssignment.orElse(null))) { + // This could be simplified to remove the assignment param and just assume that what + // was reconciled was the targetAssignment, but keeping it explicit and failing fast + // here to uncover any issues in the interaction of the assignment processing logic + // and this. + throw new IllegalStateException(String.format("Reconciled assignment %s does not " + + "match the initial target assignment %s", assignment, targetAssignment.orElse(null))); + } + this.currentAssignment = assignment; + targetAssignment = Optional.empty(); maybeTransitionToStable(); Review Comment: Do we still need `maybeTransitionToStable`? It seems that we could just transition to Stable here as we don't have any pending assignments now. I also wonder how we will signal to the heartbeat manager that it must heartbeat immediately to ack the assignment. How do you see this? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { - // Keep the latest next target assignment - nextTargetAssignment = Optional.of(newTargetAssignment); + transitionToFailed(); + throw new IllegalStateException("A target assignment pending to be reconciled already" + Review Comment: +1 ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); Review Comment: I am tempted by logging this a info. What do you think? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -19,46 +19,84 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; import java.util.Optional; /** - * Membership manager that maintains group membership for a single member following the new + * Membership manager that maintains group membership for a single member, following the new * consumer group protocol. * <p/> - * This keeps membership state and assignment updated in-memory, based on the heartbeat responses - * the member receives. It is also responsible for computing assignment for the group based on - * the metadata, if the member has been selected by the broker to do so. + * This is responsible for: + * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li> + * <li>Keeping member state as defined in {@link MemberState}.</li> + * <p/> + * Member info and state are updated based on the heartbeat responses the member receives. */ public class MembershipManagerImpl implements MembershipManager { + /** + * ID of the consumer group the member will be part of, provided when creating the current + * membership manager. + */ private final String groupId; + + /** + * Group instance ID to be used by the member, provided when creating the current membership manager. + */ private final Optional<String> groupInstanceId; + + /** + * Member ID assigned by the server to the member, received in a heartbeat response when + * joining the group specified in {@link #groupId} + */ private String memberId; + + /** + * Current epoch of the member. It will be set to 0 by the member, and provided to the server + * on the heartbeat request, to join the group. It will be then maintained by the server, + * incremented as the member reconciles and acknowledges the assignments it receives. Review Comment: Should we say that it goes back to zero on fatal errors? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -160,92 +228,91 @@ private boolean maybeTransitionToStable() { return state.equals(MemberState.STABLE); } + /** + * Take new target assignment received from the server and set it as targetAssignment to be + * processed. Following the consumer group protocol, the server won't send a new target + * member while a previous one hasn't been acknowledged by the member, so this will fail + * if a target assignment already exists. + * + * @throws IllegalStateException If a target assignment already exists. + */ private void setTargetAssignment(ConsumerGroupHeartbeatResponseData.Assignment newTargetAssignment) { if (!targetAssignment.isPresent()) { + log.debug("Member {} accepted new target assignment {} to reconcile", memberId, newTargetAssignment); targetAssignment = Optional.of(newTargetAssignment); } else { - // Keep the latest next target assignment - nextTargetAssignment = Optional.of(newTargetAssignment); + transitionToFailed(); + throw new IllegalStateException("A target assignment pending to be reconciled already" + + " exists."); } } - private boolean hasPendingTargetAssignment() { - return targetAssignment.isPresent() || nextTargetAssignment.isPresent(); - } - - - /** - * Update state and assignment as the member has successfully processed a new target - * assignment. - * This indicates the end of the reconciliation phase for the member, and makes the target - * assignment the new current assignment. - * - * @param assignment Target assignment the member was able to successfully process - */ - public void onAssignmentProcessSuccess(ConsumerGroupHeartbeatResponseData.Assignment assignment) { - updateAssignment(assignment); - transitionTo(MemberState.STABLE); - } - /** - * Update state and member info as the member was not able to process the assignment, due to - * errors in the execution of the user-provided callbacks. - * - * @param error Exception found during the execution of the user-provided callbacks + * Returns true if the member has a target assignment being processed. */ - public void onAssignmentProcessFailure(Throwable error) { - transitionTo(MemberState.FAILED); - // TODO: update member info appropriately, to clear up whatever shouldn't be kept in - // this unrecoverable state + private boolean hasPendingTargetAssignment() { + return targetAssignment.isPresent(); } private void resetEpoch() { this.memberEpoch = 0; } + /** + * {@inheritDoc} + */ @Override public MemberState state() { return state; } + /** + * {@inheritDoc} + */ @Override public AssignorSelection assignorSelection() { return this.assignorSelection; } + /** + * {@inheritDoc} + */ @Override - public ConsumerGroupHeartbeatResponseData.Assignment assignment() { + public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { return this.currentAssignment; } + + /** + * Assignment that the member received from the server but hasn't completely processed yet. + */ // VisibleForTesting Review Comment: nit: Let move this to the javadoc now that we have it. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -19,56 +19,91 @@ import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; import java.util.Optional; /** - * Membership manager that maintains group membership for a single member following the new + * Membership manager that maintains group membership for a single member, following the new * consumer group protocol. * <p/> - * This keeps membership state and assignment updated in-memory, based on the heartbeat responses - * the member receives. It is also responsible for computing assignment for the group based on - * the metadata, if the member has been selected by the broker to do so. + * This is responsible for: + * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li> + * <li>Keeping member state as defined in {@link MemberState}.</li> + * <p/> + * Member info and state are updated based on the heartbeat responses the member receives. */ public class MembershipManagerImpl implements MembershipManager { + /** + * ID of the consumer group the member will be part of., provided when creating the current + * membership manager. + */ private final String groupId; + + /** + * Group instance ID to be used by the member, provided when creating the current membership manager. + */ private final Optional<String> groupInstanceId; + + /** + * Member ID assigned by the server to the member, received in a heartbeat response when + * joining the group specified in {@link #groupId} + */ private String memberId; + + /** + * Current epoch of the member. It will be set to 0 by the member, and provided to the server + * on the heartbeat request, to join the group. It will be then maintained by the server, + * incremented as the member reconciles and acknowledges the assignments it receives. + */ private int memberEpoch; + + /** + * Current state of this member a part of the consumer group, as defined in {@link MemberState} + */ private MemberState state; + + /** + * Assignor type selection for the member. If non-null, the member will send its selection to + * the server on the {@link ConsumerGroupHeartbeatRequest}. If null, the server will select a + * default assignor for the member, which the member does not need to track. + */ private AssignorSelection assignorSelection; /** * Assignment that the member received from the server and successfully processed. */ private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; + /** * Assignment that the member received from the server but hasn't completely processed * yet. */ private Optional<ConsumerGroupHeartbeatResponseData.Assignment> targetAssignment; + /** - * Latest assignment that the member received from the server while a {@link #targetAssignment} - * was in process. + * slf4j logger. Review Comment: I agree with keeping the javadoc for consistency but we could remove `slf4j`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org