Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on PR #14690: URL: https://github.com/apache/kafka/pull/14690#issuecomment-1830640924 Follow-up PR https://github.com/apache/kafka/pull/14857 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1408361923 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -17,57 +17,113 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ -UNJOINED, +UNSUBSCRIBED, + +/** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ +ACKNOWLEDGING, + +/** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. */ -FAILED; +PREPARE_LEAVING, +/** + * Member has committed offsets and releases its assignment, so it stays in this state until + * the next heartbeat request is sent out with epoch -1 to effectively leave the group. This +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1408357542 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -192,6 +221,26 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { }); } +private List buildTopicPartitionsList(Set topicIdPartitions) { +List result = new ArrayList<>(); +Map> partitionsPerTopicId = new HashMap<>(); +for (TopicIdPartition topicIdPartition : topicIdPartitions) { +Uuid topicId = topicIdPartition.topicId(); +if (!partitionsPerTopicId.containsKey(topicId)) { +partitionsPerTopicId.put(topicId, new ArrayList<>()); +} + partitionsPerTopicId.get(topicId).add(topicIdPartition.partition()); Review Comment: Agree. It disappeared anyways after simplifying it all with the use of TopicPartitions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1408355119 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -192,6 +221,26 @@ private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { }); } +private List buildTopicPartitionsList(Set topicIdPartitions) { +List result = new ArrayList<>(); +Map> partitionsPerTopicId = new HashMap<>(); +for (TopicIdPartition topicIdPartition : topicIdPartitions) { +Uuid topicId = topicIdPartition.topicId(); +if (!partitionsPerTopicId.containsKey(topicId)) { +partitionsPerTopicId.put(topicId, new ArrayList<>()); +} + partitionsPerTopicId.get(topicId).add(topicIdPartition.partition()); +} +for (Map.Entry> entry : partitionsPerTopicId.entrySet()) { +Uuid topicId = entry.getKey(); +List partitions = entry.getValue(); +result.add(new ConsumerGroupHeartbeatRequestData.TopicPartitions() +.setTopicId(topicId) +.setPartitions(partitions)); +} Review Comment: Definitely, done in the [follow-up PR](https://github.com/apache/kafka/pull/14857) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1408354526 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -168,76 +329,686 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); + if (assignment != null) { -setTargetAssignment(assignment); +transitionTo(MemberState.RECONCILING); +replaceUnresolvedAssignmentWithNewAssignment(assignment); +resolveMetadataForUnresolvedAssignment(); +reconcile(); +} else if (allPendingAssignmentsReconciled()) { +transitionTo(MemberState.STABLE); } -maybeTransitionToStable(); +} + +/** + * Overwrite collection of unresolved topic Ids with the new target assignment. This will + * effectively achieve the following: + * + *- all topics received in assignment will try to be resolved to find their topic names + * + *- any topic received in a previous assignment that was still unresolved, and that is + *not included in the assignment anymore, will be removed from the unresolved collection. + *This should be the case when a topic is sent in an assignment, deleted right after, and + *removed from the assignment the next time a broker sends one to the member. + * + * @param assignment Target assignment received from the broker. + */ +private void replaceUnresolvedAssignmentWithNewAssignment( +ConsumerGroupHeartbeatResponseData.Assignment assignment) { +assignmentUnresolved.clear(); +assignment.topicPartitions().forEach(topicPartitions -> +assignmentUnresolved.put(topicPartitions.topicId(), topicPartitions.partitions())); } /** * {@inheritDoc} */ @Override public void transitionToFenced() { -resetEpoch(); transitionTo(MemberState.FENCED); +resetEpoch(); +log.debug("Member {} with epoch {} transitioned to {} state. It will release its " + +"assignment and rejoin the group.", memberId, memberEpoch, MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +" after member got fenced. Member will rejoin the group anyways.", error); +} +updateSubscription(Collections.emptySet(), true); +transitionToJoining(); +}); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +transitionTo(MemberState.FATAL); +log.error("Member {} with epoch {} transitioned to {} state", memberId, memberEpoch, MemberState.FATAL); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +updateSubscription(Collections.emptySet(), true); +}); } +/** + * {@inheritDoc} + */ +public void onSubscriptionUpdated() { +if (state == MemberState.UNSUBSCRIBED) { +transitionToJoining(); +} +} + +/** + * Update a new assignment by setting the assigned partitions in the member subscription. + * + * @param assignedPartitions Topic partitions to take as the new subscription assignment + * @param clearAssignments True if the + */ +private void updateSubscription(Collection assignedPartitions, +boolean clearAssignments) { +subscriptions.assignFromSubscribed(assignedPartitions); +if (clearAssignments) { +clearPendingAssignmentsAndLocalNamesCache(); +} +} + +/** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenc
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1408353902 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -168,76 +329,686 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); + if (assignment != null) { -setTargetAssignment(assignment); +transitionTo(MemberState.RECONCILING); +replaceUnresolvedAssignmentWithNewAssignment(assignment); +resolveMetadataForUnresolvedAssignment(); +reconcile(); +} else if (allPendingAssignmentsReconciled()) { +transitionTo(MemberState.STABLE); } -maybeTransitionToStable(); +} + +/** + * Overwrite collection of unresolved topic Ids with the new target assignment. This will + * effectively achieve the following: + * + *- all topics received in assignment will try to be resolved to find their topic names + * + *- any topic received in a previous assignment that was still unresolved, and that is + *not included in the assignment anymore, will be removed from the unresolved collection. + *This should be the case when a topic is sent in an assignment, deleted right after, and + *removed from the assignment the next time a broker sends one to the member. + * + * @param assignment Target assignment received from the broker. + */ +private void replaceUnresolvedAssignmentWithNewAssignment( +ConsumerGroupHeartbeatResponseData.Assignment assignment) { +assignmentUnresolved.clear(); +assignment.topicPartitions().forEach(topicPartitions -> +assignmentUnresolved.put(topicPartitions.topicId(), topicPartitions.partitions())); } /** * {@inheritDoc} */ @Override public void transitionToFenced() { -resetEpoch(); transitionTo(MemberState.FENCED); +resetEpoch(); +log.debug("Member {} with epoch {} transitioned to {} state. It will release its " + +"assignment and rejoin the group.", memberId, memberEpoch, MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +" after member got fenced. Member will rejoin the group anyways.", error); +} +updateSubscription(Collections.emptySet(), true); +transitionToJoining(); +}); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +transitionTo(MemberState.FATAL); +log.error("Member {} with epoch {} transitioned to {} state", memberId, memberEpoch, MemberState.FATAL); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +updateSubscription(Collections.emptySet(), true); +}); } +/** + * {@inheritDoc} + */ +public void onSubscriptionUpdated() { +if (state == MemberState.UNSUBSCRIBED) { +transitionToJoining(); +} +} + +/** + * Update a new assignment by setting the assigned partitions in the member subscription. + * + * @param assignedPartitions Topic partitions to take as the new subscription assignment + * @param clearAssignments True if the + */ +private void updateSubscription(Collection assignedPartitions, +boolean clearAssignments) { +subscriptions.assignFromSubscribed(assignedPartitions); +if (clearAssignments) { +clearPendingAssignmentsAndLocalNamesCache(); +} +} + +/** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenc
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac merged PR #14690: URL: https://github.com/apache/kafka/pull/14690 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on PR #14690: URL: https://github.com/apache/kafka/pull/14690#issuecomment-1818883304 I just merged trunk to fix conflicts. We can merge it when the build completes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1398998763 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -137,45 +140,71 @@ public HeartbeatRequestManager( } /** - * Determines the maximum wait time until the next poll based on the member's state, and creates a heartbeat - * request. + * This will build a heartbeat request if one must be sent, determined based on the member + * state. A heartbeat is sent in the following situations: + * + * Member is part of the consumer group or wants to join it. + * The heartbeat interval has expired, or the member is in a state that indicates + * that it should heartbeat without waiting for the interval. + * + * This will also determine the maximum wait time until the next poll based on the member's + * state. * - * If the member is without a coordinator or is in a failed state, the timer is set to Long.MAX_VALUE, as there's no need to send a heartbeat. - * If the member cannot send a heartbeat due to either exponential backoff, it will return the remaining time left on the backoff timer. - * If the member's heartbeat timer has not expired, It will return the remaining time left on the - * heartbeat timer. + * If the member is without a coordinator or is in a failed state, the timer is set + * to Long.MAX_VALUE, as there's no need to send a heartbeat. + * If the member cannot send a heartbeat due to either exponential backoff, it will + * return the remaining time left on the backoff timer. + * If the member's heartbeat timer has not expired, It will return the remaining time + * left on the heartbeat timer. * If the member can send a heartbeat, the timer is set to the current heartbeat interval. * + * + * @return {@link PollResult} that includes a heartbeat request if one must be sent, and the + * time to wait until the next poll. */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -if (!coordinatorRequestManager.coordinator().isPresent() || !membershipManager.shouldSendHeartbeat()) +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) { +membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; +} + +boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); -// TODO: We will need to send a heartbeat response after partitions being revoke. This needs to be -// implemented either with or after the partition reconciliation logic. -if (!heartbeatRequestState.canSendRequest(currentTimeMs)) +if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); +} -this.heartbeatRequestState.onSendAttempt(currentTimeMs); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(); return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); } private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { -// TODO: We only need to send the rebalanceTimeoutMs field once unless the first request failed. +// TODO: extract this logic for building the ConsumerGroupHeartbeatRequestData to a +// stateful builder (HeartbeatState), that will keep the last data sent, and determine +// the fields that changed and need to be included in the next HB (ex. check +// subscriptionState changed from last sent to include assignment). It should also +// ensure that all fields are sent on failure. ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData() .setGroupId(membershipManager.groupId()) .setMemberEpoch(membershipManager.memberEpoch()) -.setMemberId(membershipManager.memberId()) .setRebalanceTimeoutMs(rebalanceTimeoutMs); +if (membershipManager.memberId() != null) { +data.setMemberId(membershipManager.memberId()); +} + membershipManager.groupInstanceId().ifPresent(data::setInstanceId); if (this.subscriptions.hasPatternSubscription()) { // TODO: Pass the string to the GC if server side regex is used. } else { data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); +List topicPartition
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1396826235 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +}); +subscriptions.assignFromSubscribed(Collections.emptySet()); +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); +} + +/** + * {@inheritDoc} + */ +public void onSubscriptionUpdated() { +if (state == MemberState.UNSUBSCRIBED) { +transitionToJoining(); +} +// TODO: If the member is already part of the group, this should only ensure that the +// updated subscription is included in the next heartbeat request. } +/** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ +void transitionToJoining() { +if (state == MemberState.FATAL) { +log.warn("No action taken to join the group with the updated subscription because " + +"the member is in FATAL state"); +return; +} +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +registerForMetadataUpdates(); +} + +/** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ +private void registerForMetadataUpdates() { +if (!isRegisteredForMetadataUpdates) { +this.metadata.addClusterUpdateListener(this); +isRegisteredForMetadataUpdates = true; +} +} + +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public CompletableFuture leaveGroup() { +if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { +// Member is not part of the group. No-op and return completed future to avoid +// unnecessary transitions. +return CompletableFuture.completedFuture(null); +} + +if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { +// Member already leaving. No-op and return existing leave group future that will +// complete when the ongoing leave operation completes. +return leaveGroupInProgress.get(); +} + +transitionTo(MemberState.PREPARE_LEAVING); +leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscr
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1396494887 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1396494887 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1396474166 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1396462214 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,11 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { Review Comment: I updated it to align with the current behaviour (callbacks, best effort to send leave group request without any response handling or retry, and call to `subscriptions.unsubscribe` when everything completes). This has the gap of the callback execution that would require a poll. Given that we don't support callbacks in this PR, it won't block the flow, but definitely to be solved (I added the details of the challenge to solve in the [callbacks Jira](https://issues.apache.org/jira/browse/KAFKA-15276)) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r139605 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1396013115 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +}); +subscriptions.assignFromSubscribed(Collections.emptySet()); +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); +} + +/** + * {@inheritDoc} + */ +public void onSubscriptionUpdated() { +if (state == MemberState.UNSUBSCRIBED) { +transitionToJoining(); +} +// TODO: If the member is already part of the group, this should only ensure that the +// updated subscription is included in the next heartbeat request. } +/** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ +void transitionToJoining() { +if (state == MemberState.FATAL) { +log.warn("No action taken to join the group with the updated subscription because " + +"the member is in FATAL state"); +return; +} +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +registerForMetadataUpdates(); +} + +/** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ +private void registerForMetadataUpdates() { +if (!isRegisteredForMetadataUpdates) { +this.metadata.addClusterUpdateListener(this); +isRegisteredForMetadataUpdates = true; +} +} + +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public CompletableFuture leaveGroup() { +if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { +// Member is not part of the group. No-op and return completed future to avoid +// unnecessary transitions. +return CompletableFuture.completedFuture(null); +} + +if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { +// Member already leaving. No-op and return existing leave group future that will +// complete when the ongoing leave operation completes. +return leaveGroupInProgress.get(); +} + +transitionTo(MemberState.PREPARE_LEAVING); +leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscr
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1396006392 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +}); +subscriptions.assignFromSubscribed(Collections.emptySet()); +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); +} + +/** + * {@inheritDoc} + */ +public void onSubscriptionUpdated() { +if (state == MemberState.UNSUBSCRIBED) { +transitionToJoining(); +} +// TODO: If the member is already part of the group, this should only ensure that the +// updated subscription is included in the next heartbeat request. } +/** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ +void transitionToJoining() { +if (state == MemberState.FATAL) { +log.warn("No action taken to join the group with the updated subscription because " + +"the member is in FATAL state"); +return; +} +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +registerForMetadataUpdates(); +} + +/** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ +private void registerForMetadataUpdates() { +if (!isRegisteredForMetadataUpdates) { +this.metadata.addClusterUpdateListener(this); +isRegisteredForMetadataUpdates = true; +} +} + +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public CompletableFuture leaveGroup() { +if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { +// Member is not part of the group. No-op and return completed future to avoid +// unnecessary transitions. +return CompletableFuture.completedFuture(null); +} + +if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { +// Member already leaving. No-op and return existing leave group future that will +// complete when the ongoing leave operation completes. +return leaveGroupInProgress.get(); +} + +transitionTo(MemberState.PREPARE_LEAVING); +leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscr
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395947490 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +}); +subscriptions.assignFromSubscribed(Collections.emptySet()); +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); +} + +/** + * {@inheritDoc} + */ +public void onSubscriptionUpdated() { +if (state == MemberState.UNSUBSCRIBED) { +transitionToJoining(); +} +// TODO: If the member is already part of the group, this should only ensure that the +// updated subscription is included in the next heartbeat request. } +/** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ +void transitionToJoining() { +if (state == MemberState.FATAL) { +log.warn("No action taken to join the group with the updated subscription because " + +"the member is in FATAL state"); +return; +} +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +registerForMetadataUpdates(); +} + +/** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ +private void registerForMetadataUpdates() { +if (!isRegisteredForMetadataUpdates) { +this.metadata.addClusterUpdateListener(this); +isRegisteredForMetadataUpdates = true; +} +} + +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public CompletableFuture leaveGroup() { +if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { +// Member is not part of the group. No-op and return completed future to avoid +// unnecessary transitions. +return CompletableFuture.completedFuture(null); +} + +if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { +// Member already leaving. No-op and return existing leave group future that will +// complete when the ongoing leave operation completes. +return leaveGroupInProgress.get(); +} + +transitionTo(MemberState.PREPARE_LEAVING); +leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscr
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395929369 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +}); +subscriptions.assignFromSubscribed(Collections.emptySet()); +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); +} + +/** + * {@inheritDoc} + */ +public void onSubscriptionUpdated() { +if (state == MemberState.UNSUBSCRIBED) { +transitionToJoining(); +} +// TODO: If the member is already part of the group, this should only ensure that the +// updated subscription is included in the next heartbeat request. Review Comment: Agree that the next HB will pick it up based on the interval (also not seeing much need/value in the forced HB). Removed the TODO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395912856 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +}); +subscriptions.assignFromSubscribed(Collections.emptySet()); +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); Review Comment: No reason, updated it to make it consistent with the fencing transition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395907406 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member failed with fatal error.", error); +} +}); +subscriptions.assignFromSubscribed(Collections.emptySet()); Review Comment: No reason, moved it to after the callback completes, consistent with how it is done on fencing, leave and reconcile -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395894946 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.error("onPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; Review Comment: You're right, not needed anymore. The member will transition to fatal state but can keep its last member ID and epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395877609 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -168,10 +319,37 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); + if (assignment != null) { -setTargetAssignment(assignment); +transitionTo(MemberState.RECONCILING); +replaceUnresolvedAssignmentWithNewAssignment(assignment); +resolveMetadataForUnresolvedAssignment(); +// TODO: improve reconciliation triggering. Initial approach of triggering on every +// HB response and metadata update. Review Comment: Agree, done. All TODOs in this class have jiras already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395862422 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ## @@ -17,253 +17,1059 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; -import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class MembershipManagerImplTest { private static final String GROUP_ID = "test-group"; private static final String MEMBER_ID = "test-member-1"; private static final int MEMBER_EPOCH = 1; -private final LogContext logContext = new LogContext(); + +private SubscriptionState subscriptionState; +private ConsumerMetadata metadata; + +private CommitRequestManager commitRequestManager; + +private ConsumerTestBuilder testBuilder; + +@BeforeEach +public void setup() { +testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation()); +metadata = testBuilder.metadata; +subscriptionState = testBuilder.subscriptions; +commitRequestManager = testBuilder.commitRequestManager.get(); +} + +@AfterEach +public void tearDown() { +if (testBuilder != null) { +testBuilder.close(); +} +} + +private MembershipManagerImpl createMembershipManagerJoiningGroup() { +MembershipManagerImpl manager = spy(new MembershipManagerImpl( +GROUP_ID, subscriptionState, commitRequestManager, +metadata, testBuilder.logContext)); +manager.transitionToJoining(); +return manager; +} + +private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupInstanceId, + String serverAssignor) { +MembershipManagerImpl manager = new MembershipManagerImpl( +GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(serverAssignor), +subscriptionState, commitRequestManager, metadata, testBuilder.logContext); +manager.transitionToJoining(); +return manager; +} @Test public void testMembershipManagerServerAssignor() { -MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); +MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); assertEquals(Optional.empty(), membershipManager.serverAssignor()); -membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", "Uniform", logContext); +membershipManager = createMembershipManagerJoiningGroup("instance1", "Uniform"); assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor()); } @Test public void testMembershipManagerInitSupportsEmptyGroupInstanceId() { -new MembershipManagerImpl(GROUP_ID, logContext); -new MembershipManagerImpl(GROUP_ID, null, null, logContext); +createMembershipManagerJoiningGroup(); +createMembershipManagerJoiningGroup(null, null); +} + +@Test +public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() { +// First join should register to get metadata updates +MembershipManagerImpl manager = new MembershipManagerImpl( +GROUP_ID, subscriptionState, commitRequestManager, +metadata, testBuilder.logContext); +manager.tran
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395677889 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -17,25 +17,93 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; /** - * Membership manager that maintains group membership for a single member, following the new - * consumer group protocol. + * Group manager for a single consumer that has a group id defined in the config + * {@link ConsumerConfig#GROUP_ID_CONFIG}, to use the Kafka-based offset management capability, + * and the consumer group protocol to get automatically assigned partitions when calling the + * subscribe API. + * + * + * + * While the subscribe API hasn't been called (or if the consumer called unsubscribe), this manager + * will only be responsible for keeping the member in the {@link MemberState#UNSUBSCRIBED} state, + * where it can commit offsets to the group identified by the {@link #groupId()}, without joining + * the group. + * + * + * + * If the consumer subscribe API is called, this manager will use the {@link #groupId()} to join the + * consumer group, and based on the consumer group protocol heartbeats, will handle the full + * lifecycle of the member as it joins the group, reconciles assignments, handles fencing and + * fatal errors, and leaves the group. + * * - * This is responsible for: - * Keeping member info (ex. member id, member epoch, assignment, etc.) - * Keeping member state as defined in {@link MemberState}. + * + * Reconciliation process: + * The member accepts all assignments received from the broker, resolves topic names from + * metadata, reconciles the resolved assignments, and keeps the unresolved to be reconciled when + * discovered with a metadata update. Reconciliations of resolved assignments are executed + * sequentially and acknowledged to the server as they complete. The reconciliation process + * involves multiple async operations, so the member will continue to heartbeat while these + * operations complete, to make sure that the member stays in the group while reconciling. + * * - * Member info and state are updated based on the heartbeat responses the member receives. + * + * Reconciliation steps: + * + * Resolve topic names for all topic IDs received in the target assignment. Topic names + * found in metadata are then ready to be reconciled. Topic IDs not found are kept as + * unresolved, and the member request metadata updates until it resolves them (or the broker + * removes it from the target assignment. + * Commit offsets if auto-commit is enabled. + * Invoke the user-defined onPartitionsRevoked listener. + * Invoke the user-defined onPartitionsAssigned listener. + * When the above steps complete, the member acknowledges the reconciled assignment, + * which is the subset of the target that was resolved from metadata and actually reconciled. + * The ack is performed by sending a heartbeat request back to the broker, including the + * reconciled assignment. + * . Review Comment: nit: I suppose that this one should go on the previous line. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); Review Comment: nit: I may be worth logging something here as well to be consistent with `transitionToFatal`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -168,10 +319,37 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId();
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395669911 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395667965 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395665723 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -17,57 +17,113 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ -UNJOINED, +UNSUBSCRIBED, + +/** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ +ACKNOWLEDGING, + +/** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. */ -FAILED; +PREPARE_LEAVING, +/** + * Member has committed offsets and releases its assignment, so it stays in this state until + * the next heartbeat request is sent out with epoch -1 to effectively leave the group. This + *
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395143611 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395138981 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -17,57 +17,113 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ -UNJOINED, +UNSUBSCRIBED, + +/** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ +ACKNOWLEDGING, + +/** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. */ -FAILED; +PREPARE_LEAVING, +/** + * Member has committed offsets and releases its assignment, so it stays in this state until + * the next heartbeat request is sent out with epoch -1 to effectively leave the group. This +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395138233 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -17,57 +17,113 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ -UNJOINED, +UNSUBSCRIBED, + +/** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ +ACKNOWLEDGING, + +/** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. */ -FAILED; +PREPARE_LEAVING, +/** + * Member has committed offsets and releases its assignment, so it stays in this state until + * the next heartbeat request is sent out with epoch -1 to effectively leave the group. This +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395132794 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -137,39 +135,104 @@ public HeartbeatRequestManager( } /** - * Determines the maximum wait time until the next poll based on the member's state, and creates a heartbeat - * request. + * This will ensure that the member starts sending heartbeats to join the group with the + * updated subscription, if it is not already part of it. If the member is already part of + * the group, this will only ensure that the updated subscription is sent on the next + * heartbeat request. No action will be taken if the member is in a {@link MemberState#FATAL} + * state. + * + * Note that list of topics of the subscription is taken from the shared subscription state. + */ +public void onSubscriptionUpdated() { +if (membershipManager.state() == MemberState.FATAL) { +logger.debug("No action taken join the group or update the subscription because " + +"the member is in FATAL state"); +return; +} + +if (membershipManager.state() == MemberState.UNSUBSCRIBED) { +membershipManager.transitionToJoining(); +} +} + +/** + * Release assignment and send heartbeat request to leave the group. If the member is not + * part of the group or is in a FATAL state this won't take any action and will return a + * completed future. + * + * @return Future that will complete when the callback execution completes and the heartbeat + * request to leave is sent out. The future will fail it the callback execution fails. + */ +public CompletableFuture onUnsubscribe() { +boolean notInGroup = +membershipManager.state() == MemberState.UNSUBSCRIBED || +membershipManager.state() == MemberState.FATAL; +if (notInGroup) { +return CompletableFuture.completedFuture(null); +} +// TODO: Consider no-op if member is already LEAVING too (repeated calls to unsubscribe +// potentially storming the broker?). To double check the current behaviour, as it does +// not seem to handle it that way. Review Comment: Done (now in the membership manager `leaveGroup`). No-op if already leaving, and returning the future that will complete when the ongoing leave completes. Also handling the case where the member already left (no-op and return right away) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395130865 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -137,39 +135,104 @@ public HeartbeatRequestManager( } /** - * Determines the maximum wait time until the next poll based on the member's state, and creates a heartbeat - * request. + * This will ensure that the member starts sending heartbeats to join the group with the + * updated subscription, if it is not already part of it. If the member is already part of + * the group, this will only ensure that the updated subscription is sent on the next + * heartbeat request. No action will be taken if the member is in a {@link MemberState#FATAL} + * state. + * + * Note that list of topics of the subscription is taken from the shared subscription state. + */ +public void onSubscriptionUpdated() { Review Comment: Agree, I merged them into the membership manager, and this actually goes in the same direction we've discussed about the membership manager becoming a first-class manager (supporting poll, for instance).So for now I integrated it with the `ApplicationEventProcessor` already, to be able to move these 2 funcs that I totally agree make sense in the membership manager (when tackling the poll for triggering reconciliations, I will extend on this same direction) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395108554 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395108554 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394789418 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394786770 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394774045 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394759427 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -168,10 +308,37 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); + if (assignment != null) { -setTargetAssignment(assignment); +transitionTo(MemberState.RECONCILING); Review Comment: You got it right, I expect the same thing (I had this [testReconciliationSkippedWhenSameAssignmentReceived](https://github.com/apache/kafka/blob/280652bc2a794bdd942506931bca756673bbf361/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java#L559) for that) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394753399 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -77,32 +138,111 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ -private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; +private Set currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ -private Optional targetAssignment; +private final SubscriptionState subscriptions; + +/** + * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. + */ +private final ConsumerMetadata metadata; + +/** + * TopicPartition comparator based on topic name and partition id. + */ +private final static TopicPartitionComparator COMPARATOR = new TopicPartitionComparator(); /** * Logger. */ private final Logger log; -public MembershipManagerImpl(String groupId, LogContext logContext) { -this(groupId, null, null, logContext); +/** + * Manager to perform commit requests needed before revoking partitions (if auto-commit is + * enabled) + */ +private final CommitRequestManager commitRequestManager; + +/** + * Local cache of assigned topic IDs and names. Topics are added here when received in a + * target assignment, as we discover their names in the Metadata cache, and removed when the + * topic is not in the subscription anymore. The purpose of this cache is to avoid metadata + * requests in cases where a currently assigned topic is in the target assignment (new + * partition assigned, or revoked), but it is not present the Metadata cache at that moment. + * The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the + * member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}). + */ +private final Map assignedTopicNamesCache; + +/** + * Topic IDs received in a target assignment for which we haven't found topic names yet. + * Items are added to this set every time a target assignment is received. Items are removed + * when metadata is found for the topic. This is where the member collects all assignments + * received from the broker, even though they may not be ready to reconcile due to missing + * metadata. + */ +private final Map> assignmentUnresolved; + +/** + * Assignment received for which topic names have been resolved, so it's ready to be + * reconciled. Items are added to this set when received in a target assignment (if metadata + * available), or when a metadata update is received. This is where the member keeps all the + * assignment ready to reconcile, even though the reconciliation might need to wait if there + * is already another on in process. + */ +private final SortedSet assignmentReadyToReconcile; + +/** + * Epoch that a member must include a heartbeat request to indicate that it want to join or + * re-join a group. + */ +public static final int JOIN_GROUP_EPOCH = 0; + +/** + * If there is a reconciliation running (triggering commit, callbacks) for the + * assignmentReadyToReconcile. This will be true if {@link #reconcile()} has been triggered + * after receiving a heartbeat response, or a metadata update. + */ +private boolean reconciliationInProgress; + +/** + * ID the member had when the reconciliation in progress started. This is used to identify if + * the member has rejoined while it was reconciling an assignment (in which case the result + * of the reconciliation is not applied.) + */ +private String memberIdOnReconciliationStart; Review Comment: Yes, I just updated this. We're on the same page regarding that the client will keep the member ID forever and provide it backbut I was wrongly expecting it would change after rejoining. Updated now. The goal is to be able to identify a rejoin, so using the member epoch (expecting that every time a member rejoins will get a bumped epoch). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394746271 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); Review Comment: sure, done. Logging error in the same way that it's done for other callback failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394742401 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); Review Comment: You're right, we do. Added it, along with a log in case of error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394733589 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + Review Comment: Yes, done. That's how it's done for other callbacks (aligned the messages too to make them consistent for all callbacks) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394646561 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394629300 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394619681 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394585362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -77,32 +138,111 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ -private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; +private Set currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ -private Optional targetAssignment; +private final SubscriptionState subscriptions; + +/** + * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. + */ +private final ConsumerMetadata metadata; + +/** + * TopicPartition comparator based on topic name and partition id. + */ +private final static TopicPartitionComparator COMPARATOR = new TopicPartitionComparator(); /** * Logger. */ private final Logger log; -public MembershipManagerImpl(String groupId, LogContext logContext) { -this(groupId, null, null, logContext); +/** + * Manager to perform commit requests needed before revoking partitions (if auto-commit is + * enabled) + */ +private final CommitRequestManager commitRequestManager; + +/** + * Local cache of assigned topic IDs and names. Topics are added here when received in a + * target assignment, as we discover their names in the Metadata cache, and removed when the + * topic is not in the subscription anymore. The purpose of this cache is to avoid metadata + * requests in cases where a currently assigned topic is in the target assignment (new + * partition assigned, or revoked), but it is not present the Metadata cache at that moment. + * The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the + * member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}). + */ +private final Map assignedTopicNamesCache; + +/** + * Topic IDs received in a target assignment for which we haven't found topic names yet. + * Items are added to this set every time a target assignment is received. Items are removed + * when metadata is found for the topic. This is where the member collects all assignments + * received from the broker, even though they may not be ready to reconcile due to missing + * metadata. + */ +private final Map> assignmentUnresolved; + +/** + * Assignment received for which topic names have been resolved, so it's ready to be + * reconciled. Items are added to this set when received in a target assignment (if metadata + * available), or when a metadata update is received. This is where the member keeps all the + * assignment ready to reconcile, even though the reconciliation might need to wait if there + * is already another on in process. + */ +private final SortedSet assignmentReadyToReconcile; + +/** + * Epoch that a member must include a heartbeat request to indicate that it want to join or + * re-join a group. + */ +public static final int JOIN_GROUP_EPOCH = 0; Review Comment: Totally, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394581206 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -77,32 +138,111 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ -private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; +private Set currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ -private Optional targetAssignment; +private final SubscriptionState subscriptions; + +/** + * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. + */ +private final ConsumerMetadata metadata; + +/** + * TopicPartition comparator based on topic name and partition id. + */ +private final static TopicPartitionComparator COMPARATOR = new TopicPartitionComparator(); Review Comment: Done, re-arranged a couple of them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394570775 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -17,25 +17,86 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; /** - * Membership manager that maintains group membership for a single member, following the new - * consumer group protocol. + * Group manager for a single consumer that has a group id defined in the config + * {@link ConsumerConfig#GROUP_ID_CONFIG}, to use the Kafka-based offset management capability, + * and the consumer group protocol to get automatically assigned partitions when calling the + * subscribe API. + * + * + * + * While the subscribe API hasn't been called (or if the consumer called unsubscribe), this manager + * will only be responsible for keeping the member in the {@link MemberState#UNSUBSCRIBED} state, + * where it can commit offsets to the group identified by the {@link #groupId()}, without joining + * the group. + * + * + * + * If the consumer subscribe API is called, this manager will use the {@link #groupId()} to join the + * consumer group, and based on the consumer group protocol heartbeats, will handle the full + * lifecycle of the member as it joins the group, reconciles assignments, handles fencing and + * fatal errors, and leaves the group. + * * - * This is responsible for: - * Keeping member info (ex. member id, member epoch, assignment, etc.) - * Keeping member state as defined in {@link MemberState}. + * + * Reconciliation process: + * The member accepts all assignments received from the broker, resolves topic names from + * metadata, reconciles the resolved assignments, and keeps the unresolved to be reconciled when + * discovered with a metadata update. Reconciliations of resolved assignments are executed + * sequentially and acknowledged to the server as they complete. The reconciliation process + * involves multiple async operations, so the member will continue to heartbeat while these + * operations complete, to make sure that the member stays in the group while reconciling. + * * - * Member info and state are updated based on the heartbeat responses the member receives. + * + * Reconciliation steps: + * + * Resolve topic names for all topic IDs received in the target assignment. Topic names + * found in metadata are then ready to be reconciled. Topic IDs not found are kept as + * unresolved, and the member request metadata updates until it resolves them (or the broker + * removes it from the target assignment. + * Commit offsets if auto-commit is enabled. + * Invoke the user-defined onPartitionsRevoked listener. + * Invoke the user-defined onPartitionsAssigned listener. + * When the above steps complete, the member acknowledges the target assignment by + * sending a heartbeat request back to the broker, including the full target assignment + * that was just reconciled. Review Comment: Yes, you're right, I will rephrase this. It acknowledges the reconciled assignment, which is the subset of the target that was resolved from metadata and actually reconciled. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394493344 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394344222 ## clients/src/main/java/org/apache/kafka/clients/MetadataCache.java: ## @@ -102,6 +107,10 @@ Map topicIds() { return topicIds; } +Map topicNames() { Review Comment: Yes, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394322340 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -260,42 +924,52 @@ public Optional serverAssignor() { * {@inheritDoc} */ @Override -public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { +public Set currentAssignment() { return this.currentAssignment; } /** - * @return Assignment that the member received from the server but hasn't completely processed - * yet. Visible for testing. + * @return Set of topic IDs received in a target assignment that have not been reconciled yet + * because topic names are not in metadata. Visible for testing. */ -Optional targetAssignment() { -return targetAssignment; +Set topicsWaitingForMetadata() { +return Collections.unmodifiableSet(assignmentUnresolved.keySet()); } /** - * This indicates that the reconciliation of the target assignment has been successfully - * completed, so it will make it effective by assigning it to the current assignment. - * - * @params Assignment that has been successfully reconciled. This is expected to - * match the target assignment defined in {@link #targetAssignment()} + * @return Topic partitions received in a target assignment that have been resolved in + * metadata and are ready to be reconciled. Visible for testing. + */ +Set assignmentReadyToReconcile() { +return Collections.unmodifiableSet(assignmentReadyToReconcile); +} + +/** + * @return If there is a reconciliation in process now. Note that reconciliation is triggered + * by a call to {@link #reconcile()}. Visible for testing. + */ +boolean reconciliationInProgress() { +return reconciliationInProgress; +} + +/** + * When cluster metadata is updated, try to resolve topic names for topic IDs received in + * assignment that hasn't been resolved yet. + * + * Try to find topic names for all unresolved assignments + * Add discovered topic names to the local topic names cache + * If any topics are resolved, trigger a reconciliation process + * If some topics still remain unresolved, request another metadata update + * */ @Override -public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) { -if (assignment == null) { -throw new IllegalArgumentException("Assignment cannot be null"); -} -if (!assignment.equals(targetAssignment.orElse(null))) { -// This could be simplified to remove the assignment param and just assume that what -// was reconciled was the targetAssignment, but keeping it explicit and failing fast -// here to uncover any issues in the interaction of the assignment processing logic -// and this. -throw new IllegalStateException(String.format("Reconciled assignment %s does not " + -"match the expected target assignment %s", assignment, -targetAssignment.orElse(null))); +public void onUpdate(ClusterResource clusterResource) { +resolveMetadataForUnresolvedAssignment(); +if (!assignmentReadyToReconcile.isEmpty()) { +// TODO: improve reconciliation triggering. Initial approach of triggering on every +// HB response and metadata update. Review Comment: Filed [KAFKA-15832](https://issues.apache.org/jira/browse/KAFKA-15832) for this and I will take care of it right after this PR as a follow-up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394053638 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394042794 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1394032677 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +348,560 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392954694 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +347,537 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392830639 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -77,32 +138,110 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ -private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; +private Set currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ -private Optional targetAssignment; +private final SubscriptionState subscriptions; + +/** + * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. + */ +private final ConsumerMetadata metadata; + +/** + * TopicPartition comparator based on topic name and partition id. + */ +private final static TopicPartitionComparator COMPARATOR = new TopicPartitionComparator(); /** * Logger. */ private final Logger log; -public MembershipManagerImpl(String groupId, LogContext logContext) { -this(groupId, null, null, logContext); +/** + * Manager to perform commit requests needed before revoking partitions (if auto-commit is + * enabled) + */ +private final CommitRequestManager commitRequestManager; + +/** + * Local cache of assigned topic IDs and names. Topics are added here when received in a + * target assignment, as we discover their names in the Metadata cache, and removed when the + * topic is not in the subscription anymore. The purpose of this cache is to avoid metadata + * requests in cases where a currently assigned topic is in the target assignment (new + * partition assigned, or revoked), but it is not present the Metadata cache at that moment. + * The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the + * member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}). + */ +private final Map assignedTopicNamesCache; + +/** + * Topic IDs received in a target assignment for which we haven't found topic names yet. + * Items are added to this set every time a target assignment is received. Items are removed + * when metadata is found for the topic. This is where the member collects all assignments + * received from the broker, even though they may not be ready to reconcile due to missing + * metadata. + */ +private final Map> assignmentUnresolved; + +/** + * Assignment received for which topic names have been resolved, so it's ready to be + * reconciled. Items are added to this set when received in a target assignment (if metadata + * available), or when a metadata update is received. This is where the member keeps all the + * assignment ready to reconcile, even though the reconciliation might need to wait if there + * is already another on in process. + */ +private final SortedSet assignmentReadyToReconcile; + +/** + * Epoch that a member must include a heartbeat request to indicate that it want to join or + * re-join a group. + */ +public static final int JOIN_GROUP_EPOCH = 0; + +/** + * If there is a reconciliation running (triggering commit, callbacks) for the + * assignmentReadyToReconcile. This will be true if {@link #reconcile()} has been triggered + * after receiving a heartbeat response, or a metadata update. + */ +private boolean reconciliationInProgress; + +/** + * ID the member had when the reconciliation in progress started. This is used to identify if + * the member has rejoined while it was reconciling an assignment (in which case the result + * of the reconciliation is not applied.) + */ +private String memberIdOnReconciliationStart; + + Review Comment: Those kinds of things tend to jump out in _other people's_ code, but I frequently miss them in my own 😄 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -77,32 +138,110 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ -private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; +private Set currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ -privat
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392829031 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -25,7 +25,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, -LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA +LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, Review Comment: I never liked `ASSIGNMENT_CHANGE` either, but I guess it's consistent, so 👍 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -25,7 +25,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, -LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA +LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, Review Comment: I never liked `ASSIGNMENT_CHANGE` either, but I guess it's consistent, so 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392827512 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals.events; + +/** + * Application event triggered when a user calls the unsubscribe API. This will make the consumer + * release all its assignments and send a heartbeat request to leave the consumer group. + * This event holds a future that will complete when the invocation of callbacks to release + * complete and the heartbeat to leave the group is sent out (minimal effort to send the + * leave group heartbeat, without waiting for any response or considering timeouts). + */ +public class UnsubscribeApplicationEvent extends CompletableApplicationEvent { Review Comment: Where does it block? I didn't see a call to `Future.get()` when I looked. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392802141 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -77,32 +138,110 @@ public class MembershipManagerImpl implements MembershipManager { /** * Assignment that the member received from the server and successfully processed. */ -private ConsumerGroupHeartbeatResponseData.Assignment currentAssignment; +private Set currentAssignment; /** - * Assignment that the member received from the server but hasn't completely processed - * yet. + * Subscription state object holding the current assignment the member has for the topics it + * subscribed to. */ -private Optional targetAssignment; +private final SubscriptionState subscriptions; + +/** + * Metadata that allows us to create the partitions needed for {@link ConsumerRebalanceListener}. + */ +private final ConsumerMetadata metadata; + +/** + * TopicPartition comparator based on topic name and partition id. + */ +private final static TopicPartitionComparator COMPARATOR = new TopicPartitionComparator(); /** * Logger. */ private final Logger log; -public MembershipManagerImpl(String groupId, LogContext logContext) { -this(groupId, null, null, logContext); +/** + * Manager to perform commit requests needed before revoking partitions (if auto-commit is + * enabled) + */ +private final CommitRequestManager commitRequestManager; + +/** + * Local cache of assigned topic IDs and names. Topics are added here when received in a + * target assignment, as we discover their names in the Metadata cache, and removed when the + * topic is not in the subscription anymore. The purpose of this cache is to avoid metadata + * requests in cases where a currently assigned topic is in the target assignment (new + * partition assigned, or revoked), but it is not present the Metadata cache at that moment. + * The cache is cleared when the subscription changes ({@link #transitionToJoining()}, the + * member fails ({@link #transitionToFatal()} or leaves the group ({@link #leaveGroup()}). + */ +private final Map assignedTopicNamesCache; + +/** + * Topic IDs received in a target assignment for which we haven't found topic names yet. + * Items are added to this set every time a target assignment is received. Items are removed + * when metadata is found for the topic. This is where the member collects all assignments + * received from the broker, even though they may not be ready to reconcile due to missing + * metadata. + */ +private final Map> assignmentUnresolved; + +/** + * Assignment received for which topic names have been resolved, so it's ready to be + * reconciled. Items are added to this set when received in a target assignment (if metadata + * available), or when a metadata update is received. This is where the member keeps all the + * assignment ready to reconcile, even though the reconciliation might need to wait if there + * is already another on in process. + */ +private final SortedSet assignmentReadyToReconcile; + +/** + * Epoch that a member must include a heartbeat request to indicate that it want to join or + * re-join a group. + */ +public static final int JOIN_GROUP_EPOCH = 0; + +/** + * If there is a reconciliation running (triggering commit, callbacks) for the + * assignmentReadyToReconcile. This will be true if {@link #reconcile()} has been triggered + * after receiving a heartbeat response, or a metadata update. + */ +private boolean reconciliationInProgress; + +/** + * ID the member had when the reconciliation in progress started. This is used to identify if + * the member has rejoined while it was reconciling an assignment (in which case the result + * of the reconciliation is not applied.) + */ +private String memberIdOnReconciliationStart; + + Review Comment: he he, I do avoid this, missed it here, fixed ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392790381 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +347,537 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + +clearPendingAssignmentsAndLocalNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearPendingAssignmentsAndLocalNamesCache(); +// Reset member ID of the reconciliation in progress (if any), to make sure that if the +// reconciliation completes while the member is rejoining but hasn't received the new +// member ID yet, the reconciliation result is discarded. +memberIdOnReconciliationStart = null; } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} */ -private boolean maybeTransitionToStable() { -if (!hasPendingTargetAssignment()) { -transitionTo(MemberState.STABLE); +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearPendingAssignmentsAndLocalNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392748787 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -25,7 +25,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, -LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA +LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, Review Comment: It's exactly when the user changes the subscription via a call to subscribe. I used the name `SUBSCRIPTION_CHANGE` because it seemed clear and to be consistent with the existing `ASSIGNMENT_CHANGE`, but let me know if you think another name would be better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392744058 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -17,57 +17,111 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ -UNJOINED, +UNSUBSCRIBED, + +/** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ +ACKNOWLEDGING, + +/** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. + */ +PREPARE_LEAVING, + +/** + * Member has committed offsets and releases its assignment, so it stays in this state until + * the next heartbeat request is sent out with epoch -1 to effectively leave the group. This + * state indic
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392732597 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -1093,6 +1099,10 @@ private void subscribeInternal(Collection topics, Optional(topics), listener)) metadata.requestUpdateForNewTopics(); + +// Trigger subscribe event to effectively join the group if not already part of it, +// or just send the new subscription to the broker. +applicationEventHandler.add(new SubscriptionChangeApplicationEvent()); Review Comment: I see, I was just intentionally leaving out all the pattern based logic because we don't support it at this point. But this makes me realize that that `subscribeInternal` based on pattern that you mentioned is wired to the `subscribe(Pattern pattern)` API call, when it's truly not supported yet. I think we should disable all the subscribe based on patterns until we implement them properly. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392733198 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -260,42 +900,52 @@ public Optional serverAssignor() { * {@inheritDoc} */ @Override -public ConsumerGroupHeartbeatResponseData.Assignment currentAssignment() { +public Set currentAssignment() { return this.currentAssignment; } /** - * @return Assignment that the member received from the server but hasn't completely processed - * yet. Visible for testing. + * @return Set of topic IDs received in a target assignment that have not been reconciled yet + * because topic names are not in metadata. Visible for testing. */ -Optional targetAssignment() { -return targetAssignment; +Set topicsWaitingForMetadata() { +return Collections.unmodifiableSet(assignmentUnresolved.keySet()); } /** - * This indicates that the reconciliation of the target assignment has been successfully - * completed, so it will make it effective by assigning it to the current assignment. - * - * @params Assignment that has been successfully reconciled. This is expected to - * match the target assignment defined in {@link #targetAssignment()} + * @return Topic partitions received in a target assignment that have been resolved in + * metadata and are ready to be reconciled. Visible for testing. + */ +Set assignmentReadyToReconcile() { +return Collections.unmodifiableSet(assignmentReadyToReconcile); +} + +/** + * @return If there is a reconciliation in process now. Note that reconciliation is triggered + * by a call to {@link #reconcile()}. Visible for testing. + */ +boolean reconciliationInProgress() { +return reconciliationInProgress; +} + +/** + * When cluster metadata is updated, try to resolve topic names for topic IDs received in + * assignment that hasn't been resolved yet. + * + * Try to find topic names for all unresolved assignments + * Add discovered topic names to the local topic names cache + * If any topics are resolved, trigger a reconciliation process + * If some topics still remain unresolved, request another metadata update + * */ @Override -public void onTargetAssignmentProcessComplete(ConsumerGroupHeartbeatResponseData.Assignment assignment) { -if (assignment == null) { -throw new IllegalArgumentException("Assignment cannot be null"); -} -if (!assignment.equals(targetAssignment.orElse(null))) { -// This could be simplified to remove the assignment param and just assume that what -// was reconciled was the targetAssignment, but keeping it explicit and failing fast -// here to uncover any issues in the interaction of the assignment processing logic -// and this. -throw new IllegalStateException(String.format("Reconciled assignment %s does not " + -"match the expected target assignment %s", assignment, -targetAssignment.orElse(null))); +public void onUpdate(ClusterResource clusterResource) { +resolveMetadataForUnresolvedAssignment(); +if (!assignmentReadyToReconcile.isEmpty()) { +// TODO: improve reconciliation triggering. Initial approach of triggering on every +// HB response and metadata update. +reconcile(); } Review Comment: Good catch, added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1392717259 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals.events; + +/** + * Application event triggered when a user calls the unsubscribe API. This will make the consumer + * release all its assignments and send a heartbeat request to leave the consumer group. + * This event holds a future that will complete when the invocation of callbacks to release + * complete and the heartbeat to leave the group is sent out (minimal effort to send the + * leave group heartbeat, without waiting for any response or considering timeouts). + */ +public class UnsubscribeApplicationEvent extends CompletableApplicationEvent { Review Comment: The `Consumer.unsubscribe` does block on the callback execution, that's why it is a `CompletableApplicationEvent`. Only after the callback completes the unsubscribe can send the actual leave group heartbeat request. Makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
kirktrue commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1391862343 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -1093,6 +1099,10 @@ private void subscribeInternal(Collection topics, Optional(topics), listener)) metadata.requestUpdateForNewTopics(); + +// Trigger subscribe event to effectively join the group if not already part of it, +// or just send the new subscription to the broker. +applicationEventHandler.add(new SubscriptionChangeApplicationEvent()); Review Comment: There's another `subscribeInternal()` for the topic pattern path. We want this there too, right? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() { @Override public void enforceRebalance() { -throw new KafkaException("method not implemented"); Review Comment: Good call! ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/UnsubscribeApplicationEvent.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals.events; + +/** + * Application event triggered when a user calls the unsubscribe API. This will make the consumer + * release all its assignments and send a heartbeat request to leave the consumer group. + * This event holds a future that will complete when the invocation of callbacks to release + * complete and the heartbeat to leave the group is sent out (minimal effort to send the + * leave group heartbeat, without waiting for any response or considering timeouts). + */ +public class UnsubscribeApplicationEvent extends CompletableApplicationEvent { Review Comment: The intention of the `CompleteableApplicationEvent` was to have a way for the consumer to block on the results of operations performed in the background thread. Since the `Consumer.unsubscribe()` API call is non-blocking, I'm thinking this should be a subclass of `ApplicationEvent`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java: ## @@ -25,7 +25,8 @@ public abstract class ApplicationEvent { public enum Type { COMMIT, POLL, FETCH_COMMITTED_OFFSET, METADATA_UPDATE, ASSIGNMENT_CHANGE, -LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA +LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIPTION_CHANGE, Review Comment: ```suggestion LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, SUBSCRIBED, ``` `SUBSCRIPTION_CHANGE` is a bit vague. Does it encompass more than the event of the user calling `Consumer.subscribe()`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,63 +347,537 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", membe
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1390590490 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +256,465 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = LEAVE_GROUP_EPOCH; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} + */ +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +callbackResult = CompletableFuture.completedFuture(null); +} else { +// Release assignment +if (memberEpoch > 0) { +// Member is part of the group. Invoke onPartitionsRevoked. +callbackResult = revokePartitions(droppedPartitions); +} else { +// Member is not part of the group anymore. Invoke onPartitionsLost. +callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); +} +} +return callbackResult; +} + +/** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1390583612 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -17,57 +17,111 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ -UNJOINED, +UNSUBSCRIBED, + +/** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ +ACKNOWLEDGING, + +/** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. + */ +PREPARE_LEAVING, + +/** + * Member has committed offsets and releases its assignment, so it stays in this state until Review Comment: Yes, similar to the ACKNOWLEDGING in the sense that they are just a way to indicate that a heartb
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1390582429 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -96,7 +96,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { * @param topic to be requested. If empty, return the metadata for all topics. * @return the future of the metadata request. */ -public CompletableFuture>> requestTopicMetadata(final Optional topic) { +public CompletableFuture>> requestTopicMetadata(final Optional topic) { Review Comment: You're right, not needed anymore (all metadata interaction is now based on the centralized metadata cache). All removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1389220863 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java: ## @@ -96,7 +96,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { * @param topic to be requested. If empty, return the metadata for all topics. * @return the future of the metadata request. */ -public CompletableFuture>> requestTopicMetadata(final Optional topic) { +public CompletableFuture>> requestTopicMetadata(final Optional topic) { Review Comment: Do we still need the changes in this class? It seems that we don't use them anymore. I have the same question for the new `Topic` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1387282757 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberState.java: ## @@ -17,57 +17,111 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.protocol.Errors; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public enum MemberState { /** - * Member has not joined a consumer group yet, or has been fenced and needs to re-join. + * Member has a group id, but it is not subscribed to any topic to receive automatic + * assignments. This will be the state when the member has never subscribed, or when it has + * unsubscribed from all topics. While in this state the member can commit offsets but won't + * be an active member of the consumer group (no heartbeats sent). */ -UNJOINED, +UNSUBSCRIBED, + +/** + * Member is attempting to join a consumer group. While in this state, the member will send + * heartbeat requests on the interval, with epoch 0, until it gets a response with an epoch > 0 + * or a fatal failure. A member transitions to this state when it tries to join the group for + * the first time with a call to subscribe, or when it has been fenced and tries to re-join. + */ +JOINING, /** * Member has received a new target assignment (partitions could have been assigned or - * revoked), and it is processing it. While in this state, the member will - * invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and then make - * the new assignment effective. + * revoked), and it is processing it. While in this state, the member will continue to send + * heartbeat on the interval, and reconcile the assignment (it will commit offsets if + * needed, invoke the user callbacks for onPartitionsAssigned or onPartitionsRevoked, and make + * the new assignment effective). Note that while in this state the member may be trying to + * resolve metadata for the target assignment, or triggering commits/callbacks if topic names + * already resolved. */ -// TODO: determine if separate state will be needed for assign/revoke (not for now) RECONCILING, /** - * Member is active in a group (heartbeating) and has processed all assignments received. + * Member has completed reconciling an assignment received, and stays in this state only until + * the next heartbeat request is sent out to acknowledge the assignment to the server. This + * state indicates that the next heartbeat request must be sent without waiting for the + * heartbeat interval to expire. Note that once the ack is sent, the member could go back to + * {@link #RECONCILING} if it still has assignment waiting to be reconciled (assignments + * waiting for metadata, assignments for which metadata was resolved, or new assignments + * received from the broker) + */ +ACKNOWLEDGING, + +/** + * Member is active in a group and has processed all assignments received. While in this + * state, the member will send heartbeats on the interval. */ STABLE, /** - * Member transitions to this state when it receives a - * {@link org.apache.kafka.common.protocol.Errors#UNKNOWN_MEMBER_ID} or - * {@link org.apache.kafka.common.protocol.Errors#FENCED_MEMBER_EPOCH} error from the - * broker. This is a recoverable state, where the member - * gives up its partitions by invoking the user callbacks for onPartitionsLost, and then - * transitions to {@link #UNJOINED} to rejoin the group as a new member. + * Member transitions to this state when it receives a {@link Errors#UNKNOWN_MEMBER_ID} or + * {@link Errors#FENCED_MEMBER_EPOCH} error from the broker, indicating that it has been + * left out of the group. While in this state, the member will stop sending heartbeats, it + * will give up its partitions by invoking the user callbacks for onPartitionsLost, and then + * transition to {@link #JOINING} to re-join the group as a new member. */ FENCED, /** - * The member failed with an unrecoverable error + * The member transitions to this state after a call to unsubscribe. While in this state, the + * member will stop sending heartbeats, will commit offsets if needed and release its + * assignment (calling user's callback for partitions revoked or lost). When all these + * actions complete, the member will transition out of this state into {@link #LEAVING} to + * effectively leave the group. + */ +PREPARE_LEAVING, + +/** + * Member has committed offsets and releases its assignment, so it stays in this state until Review Comment: This is just a transient state for the member to send the leave group heartbeat with epoch -1/-
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1385350785 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +256,465 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = LEAVE_GROUP_EPOCH; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} + */ +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +callbackResult = CompletableFuture.completedFuture(null); +} else { +// Release assignment +if (memberEpoch > 0) { +// Member is part of the group. Invoke onPartitionsRevoked. +callbackResult = revokePartitions(droppedPartitions); +} else { +// Member is not part of the group anymore. Invoke onPartitionsLost. +callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); +} +} +return callbackResult; +} + +/** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383658863 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -166,19 +166,41 @@ private static long findMinTime(final Collection request .orElse(Long.MAX_VALUE); } -public void maybeAutoCommit(final Map offsets) { +/** + * Generate a request to commit offsets if auto-commit is enabled. The request will be + * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a + * request if there is no other commit request already in-flight, and if the commit interval + * has elapsed. + * + * @param offsets Offsets to commit + * @return Future that will complete when a response is received for the request, or a + * completed future if no request is generated. + */ +public CompletableFuture maybeAutoCommit(final Map offsets) { if (!autoCommitState.isPresent()) { -return; +return CompletableFuture.completedFuture(null); } AutoCommitState autocommit = autoCommitState.get(); if (!autocommit.canSendAutocommit()) { -return; +return CompletableFuture.completedFuture(null); } -sendAutoCommit(offsets); +CompletableFuture result = sendAutoCommit(offsets); autocommit.resetTimer(); autocommit.setInflightCommitStatus(true); +return result; +} + +/** + * If auto-commit is enabled, this will generate a commit offsets request for all assigned + * partitions and their current positions. + * + * @return Future that will complete when a response is received for the request, or a + * completed future if no request is generated. + */ +public CompletableFuture maybeAutoCommitAllConsumed() { Review Comment: Can we return an optional future here because there's technically nothing to be completed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1385085418 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +262,465 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); +} + +/** + * {@inheritDoc} + */ +@Override +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = LEAVE_GROUP_EPOCH; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +transitionTo(MemberState.FATAL); +} + +/** + * {@inheritDoc} + */ +@Override +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public CompletableFuture leaveGroup() { +transitionTo(MemberState.LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; } +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +callbackResult = CompletableFuture.completedFuture(null); +} else { +// Release assignment +if (memberEpoch > 0) { +// Member is part of the group. Invoke onPartitionsRevoked. +callbackResult = revokePartitions(droppedPartitions); +} else { +// Member is not part of the group anymore. Invoke onPartitionsLost. +callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); +} +} +return callbackResult; +} + +/** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so that a heartbeat + * request is sent out with it. + */ +private void transitionToSendingLeaveGroup() { +memberEpoch = leaveGroupEpoch(); +currentAssignment = new HashSet<>(); +targetAssignment = Optional.empty(); +transit
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1385082158 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +307,469 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); Review Comment: Agree, done here. Note my answer above though, about the cases where we do need revoke or lost, that one should stay. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1385059304 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: Agree that the unsubscribe blocks on the callbacks, but since we don't have the implementation for how callbacks are going to be executed on this PR this unsubscribe is still not using it. It should come when we nail the implementation details on the follow-up PR (will depend on how we end up doing it, maybe blocking not here, but on the subscribe/unsubscribe events) As for the exception, it will originate in the Application thread, where the callback is executed, and it should only be returned to the user when it calls poll, to maintain the current behaviour, so I removed it from here to stay consistent and leave all logic related to the callback execution out, I see the confusion introduced. The when complete should stay because it represents a concept we need, un-related to callbacks: we do need to update the `subscriptionState` only when the Unsubscribe event completes (HB to leave group sent to the broker). We need it know to make sure we are able to run unsubscribe, no callbacks, sending leave group, and clearing up the subscription state after sending the request. Trying to leave it in a consistent state with no callbacks. Follow-up PR should introduce implementation for executing them, blocking appropriately on the execution, and throwing the exceptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1384076305 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: I think this KafkaException might not be in the right place because the rebalance listener needs to be invoked on the mainthread, probably before sending out the event. I wonder if we could just remove this whenComplete and rely on the background thread to log the failures during the leave group event. If there's a fatal exception being thrown there, it seems the sensible way is to enqueue to the BackgroundEventQueue and handle in the poll. wdyt? In the current code path, I think only exceptions can only be thrown in `onLeavePrepare`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1384016549 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: I think unsubscribe() actually blocks on callback invocation and throw if possible. Instead of putting the logic in whenComplete, it seems like we should try to wait till the callback completes then throw if needed. I assume we want to maintain this behavior for the async consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383998319 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: The current consumer throws KafkaException on unsubscribe() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383998319 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: The current consumer throws KafkaException on unsubscribe() -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1384004581 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Topic.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; + +/** + * A topic represented by a universally unique identifier and a topic name. + */ +public class Topic { + +private final Uuid topicId; + +private final String topicName; + +public Topic(Uuid topicId) { +this(topicId, null); +} + +public Topic(String topicName) { +this(null, topicName); +} + +public Topic(Uuid topicId, String topicName) { +this.topicId = topicId; +this.topicName = topicName; +} + +public Uuid topicId() { +return topicId; +} + +public String topicName() { +return topicName; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +Topic other = (Topic) o; +return Objects.equals(topicId, other.topicId) && Objects.equals(topicName, other.topicName); +} + +@Override +public int hashCode() { +final int prime = 31; +int result = prime + Objects.hashCode(topicId); +result = prime * result + Objects.hashCode(topicName); +return result; +} + +@Override +public String toString() { +return topicId + ":" + topicName; Review Comment: Thanks for the clarification! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383998319 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: The current consumer throws KafkaException on unsubscribe(), I think throwing in the whenComplete doesn't actually throw. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383995398 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() { @Override public void enforceRebalance() { -throw new KafkaException("method not implemented"); +log.warn("Operation not supported in new consumer group protocol"); } @Override public void enforceRebalance(String reason) { -throw new KafkaException("method not implemented"); +log.warn("Operation not supported in new consumer group protocol"); Review Comment: Thanks for the clarification! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383986418 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Topic.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; + +/** + * A topic represented by a universally unique identifier and a topic name. + */ +public class Topic { + +private final Uuid topicId; + +private final String topicName; + +public Topic(Uuid topicId) { +this(topicId, null); +} + +public Topic(String topicName) { +this(null, topicName); +} + +public Topic(Uuid topicId, String topicName) { +this.topicId = topicId; +this.topicName = topicName; +} + +public Uuid topicId() { +return topicId; +} + +public String topicName() { +return topicName; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +Topic other = (Topic) o; +return Objects.equals(topicId, other.topicId) && Objects.equals(topicName, other.topicName); +} + +@Override +public int hashCode() { +final int prime = 31; +int result = prime + Objects.hashCode(topicId); +result = prime * result + Objects.hashCode(topicName); +return result; +} + +@Override +public String toString() { +return topicId + ":" + topicName; Review Comment: Actually I intentionally followed the standard of the TopicPartition and TopicIdPartition toString implementations, since this new Topic class is kind of a sibling, makes sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383983917 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() { @Override public void enforceRebalance() { -throw new KafkaException("method not implemented"); +log.warn("Operation not supported in new consumer group protocol"); } @Override public void enforceRebalance(String reason) { -throw new KafkaException("method not implemented"); +log.warn("Operation not supported in new consumer group protocol"); Review Comment: The KIP states that we should do exactly this, and actually the consumer would stay functional. `Consumer#enforceRebalance will be deprecated and will be a no-op if used when the new protocol is enable. A warning will be logged in this case.` Do you have a concern that I may be missing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383979443 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +307,469 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearAssignedTopicNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +clearAssignedTopicNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearAssignedTopicNamesCache(); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} + */ +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearAssignedTopicNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { Review Comment: The reason is the leave group logic. On leave group it could be lost or revoked, depending on the epoch. On fence or fatal is always lost. I was just reusing the same func for convenience, but will change the fence and fatal transitions to directly invoke the `onPartitionsLost` just to make the intention clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383974744 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -676,12 +678,12 @@ public ConsumerGroupMetadata groupMetadata() { @Override public void enforceRebalance() { -throw new KafkaException("method not implemented"); +log.warn("Operation not supported in new consumer group protocol"); } @Override public void enforceRebalance(String reason) { -throw new KafkaException("method not implemented"); +log.warn("Operation not supported in new consumer group protocol"); Review Comment: can we just fail the consumer? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383968073 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/Topic.java: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.Uuid; + +import java.util.Objects; + +/** + * A topic represented by a universally unique identifier and a topic name. + */ +public class Topic { + +private final Uuid topicId; + +private final String topicName; + +public Topic(Uuid topicId) { +this(topicId, null); +} + +public Topic(String topicName) { +this(null, topicName); +} + +public Topic(Uuid topicId, String topicName) { +this.topicId = topicId; +this.topicName = topicName; +} + +public Uuid topicId() { +return topicId; +} + +public String topicName() { +return topicName; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (o == null || getClass() != o.getClass()) { +return false; +} +Topic other = (Topic) o; +return Objects.equals(topicId, other.topicId) && Objects.equals(topicName, other.topicName); +} + +@Override +public int hashCode() { +final int prime = 31; +int result = prime + Objects.hashCode(topicId); +result = prime * result + Objects.hashCode(topicName); +return result; +} + +@Override +public String toString() { +return topicId + ":" + topicName; Review Comment: Can we use the standard toString format? Topic(topicId=...) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383962388 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: You're right, it should happen in the poll to maintain the current contract. I will just remove it since we don't execute callbacks yet, and it should be included in the PR that introduces the callback execution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383959347 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -149,27 +192,36 @@ public HeartbeatRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -if (!coordinatorRequestManager.coordinator().isPresent() || !membershipManager.shouldSendHeartbeat()) +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) return NetworkClientDelegate.PollResult.EMPTY; -// TODO: We will need to send a heartbeat response after partitions being revoke. This needs to be -// implemented either with or after the partition reconciliation logic. -if (!heartbeatRequestState.canSendRequest(currentTimeMs)) +boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); Review Comment: Yes, as we discussed for the states that would send heartbeat without waiting for the interval (sending ack for an assignment and leave group requests basically) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383791886 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -149,27 +192,36 @@ public HeartbeatRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -if (!coordinatorRequestManager.coordinator().isPresent() || !membershipManager.shouldSendHeartbeat()) +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) return NetworkClientDelegate.PollResult.EMPTY; -// TODO: We will need to send a heartbeat response after partitions being revoke. This needs to be -// implemented either with or after the partition reconciliation logic. -if (!heartbeatRequestState.canSendRequest(currentTimeMs)) +boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); Review Comment: I'm guessing the idea is we need to ignore the backoff and heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383786483 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: Looking at the current consumer, I think it can be quite complicated in this implementation. Maybe what we should do is to invoke the listener on the spot, then send an even to the background thread to leave group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383780173 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -843,7 +845,15 @@ private void updatePatternSubscription(Cluster cluster) { @Override public void unsubscribe() { fetchBuffer.retainAll(Collections.emptySet()); -subscriptions.unsubscribe(); +UnsubscribeApplicationEvent unsubscribeApplicationEvent = new UnsubscribeApplicationEvent(); +applicationEventHandler.add(unsubscribeApplicationEvent); +unsubscribeApplicationEvent.future().whenComplete((result, error) -> { +if (error != null) { +// Callback failed - Keeping same exception message thrown by the legacy consumer +throw new KafkaException("User rebalance callback throws an error", error); Review Comment: I don't think the exception will actually be thrown here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383701313 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +307,469 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); Review Comment: As previously mentioned, it is clearer to directly invoke `invokeOnPartitionsLost` here. Is there a case we don't invoke onPartitionsLost? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383699023 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +307,469 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); + +clearAssignedTopicNamesCache(); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +clearAssignedTopicNamesCache(); +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); +clearAssignedTopicNamesCache(); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} + */ +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.PREPARE_LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +clearAssignedTopicNamesCache(); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { Review Comment: is there any reason we want to combine onPartitionsLost and onPartitionsRevoke into a single function? Couldn't we directly invoke `invokeOnPartitionsRevoke` or `invokeOnPartitionsLost`? i.e. by the time the consumer is fenced or would know we need to invoke onPartitionsLost -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
philipnee commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383658863 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -166,19 +166,41 @@ private static long findMinTime(final Collection request .orElse(Long.MAX_VALUE); } -public void maybeAutoCommit(final Map offsets) { +/** + * Generate a request to commit offsets if auto-commit is enabled. The request will be + * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a + * request if there is no other commit request already in-flight, and if the commit interval + * has elapsed. + * + * @param offsets Offsets to commit + * @return Future that will complete when a response is received for the request, or a + * completed future if no request is generated. + */ +public CompletableFuture maybeAutoCommit(final Map offsets) { if (!autoCommitState.isPresent()) { -return; +return CompletableFuture.completedFuture(null); } AutoCommitState autocommit = autoCommitState.get(); if (!autocommit.canSendAutocommit()) { -return; +return CompletableFuture.completedFuture(null); } -sendAutoCommit(offsets); +CompletableFuture result = sendAutoCommit(offsets); autocommit.resetTimer(); autocommit.setInflightCommitStatus(true); +return result; +} + +/** + * If auto-commit is enabled, this will generate a commit offsets request for all assigned + * partitions and their current positions. + * + * @return Future that will complete when a response is received for the request, or a + * completed future if no request is generated. + */ +public CompletableFuture maybeAutoCommitAllConsumed() { Review Comment: Can we return an optional future here because there's technically nothing to be completed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383623183 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -181,33 +256,465 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + +// Release assignment +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { +if (error != null) { +log.debug("OnPartitionsLost callback invocation failed while releasing assignment" + +"after member got fenced. Member will rejoin the group anyways.", error); +} +subscriptions.assignFromSubscribed(Collections.emptySet()); +transitionToJoining(); +}); } /** * {@inheritDoc} */ @Override -public void transitionToFailed() { -log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); -transitionTo(MemberState.FAILED); +public void transitionToFatal() { +log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + +// Update epoch to indicate that the member is not in the group anymore, so that the +// onPartitionsLost is called to release assignment. +memberEpoch = LEAVE_GROUP_EPOCH; +invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + +transitionTo(MemberState.FATAL); } +/** + * {@inheritDoc} + */ @Override -public boolean shouldSendHeartbeat() { -return state() != MemberState.FAILED; +public void transitionToJoining() { +resetEpoch(); +transitionTo(MemberState.JOINING); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * {@inheritDoc} + */ +@Override +public CompletableFuture leaveGroup() { +transitionTo(MemberState.LEAVING); + +CompletableFuture callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); +callbackResult.whenComplete((result, error) -> { + +// Clear the subscription, no matter if the callback execution failed or succeeded. +subscriptions.assignFromSubscribed(Collections.emptySet()); + +// Transition to ensure that a heartbeat request is sent out to effectively leave the +// group (even in the case where the member had no assignment to release or when the +// callback execution failed.) +transitionToSendingLeaveGroup(); + +}); + +// Return callback future to indicate that the leave group is done when the callbacks +// complete, without waiting for the heartbeat to be sent out. (Best effort to send it +// but do not hold the leave group operation for it) +return callbackResult; +} + +/** + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * + * If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe) + * + * If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced . + * + * + * @return Future that will complete when the callback execution completes. + */ +private CompletableFuture invokeOnPartitionsRevokedOrLostToReleaseAssignment() { +SortedSet droppedPartitions = new TreeSet<>(COMPARATOR); +droppedPartitions.addAll(subscriptions.assignedPartitions()); + +CompletableFuture callbackResult; +if (droppedPartitions.isEmpty()) { +// No assignment to release +callbackResult = CompletableFuture.completedFuture(null); +} else { +// Release assignment +if (memberEpoch > 0) { +// Member is part of the group. Invoke onPartitionsRevoked. +callbackResult = revokePartitions(droppedPartitions); +} else { +// Member is not part of the group anymore. Invoke onPartitionsLost. +callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); +} +} +return callbackResult; +} + +/** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#SENDING_LEAVE_REQUEST} state so th
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383610788 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -149,15 +149,17 @@ public HeartbeatRequestManager( */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -if (!coordinatorRequestManager.coordinator().isPresent() || !membershipManager.shouldSendHeartbeat()) +if (!coordinatorRequestManager.coordinator().isPresent() || membershipManager.shouldSkipHeartbeat()) return NetworkClientDelegate.PollResult.EMPTY; -// TODO: We will need to send a heartbeat response after partitions being revoke. This needs to be -// implemented either with or after the partition reconciliation logic. -if (!heartbeatRequestState.canSendRequest(currentTimeMs)) +boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); + +if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); +} -this.heartbeatRequestState.onSendAttempt(currentTimeMs); +heartbeatRequestState.onSendAttempt(currentTimeMs); +membershipManager.onHeartbeatRequestSent(); Review Comment: I'm using `send` to align with how the HB manager was naming the existing `onSendAttempt` and such, and I think that as seen from the manager point of view it is enough, the membership manager only needs to know that the request is sent out of the HB manager to transition accordingly. It is not literally sent over the network. I would leave send/sent to keep it consistent between the 2 managers, but let me know if you think it would be clearer differently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15574; [1/N]: Client state machine updates [kafka]
lianetm commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1383599082 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -73,30 +84,55 @@ public interface MembershipManager { /** * @return Current assignment for the member. */ -ConsumerGroupHeartbeatResponseData.Assignment currentAssignment(); +Set currentAssignment(); Review Comment: I will share thoughts on this on our next sync with @dajac and you @AndrewJSchofield, as this is interesting. Just for the record, I do agree that spreading topic IDs is the right way forward, and we do have them now in the assignment path, but I realized when exploring this suggestion that it is a much bigger change if we move away from `TopicPartition`, and we're not ready for it at this point (mainly tricky/ugly because not all paths support topicID yet but most of them access/update the shared `subscriptionState` component and `TopicPartition`) At this point looks to me that we're better of making use of the topic IDs kind of "on the side", like we're doing now in the `membershipManager`, that keeps the assigned topicId/names but still uses the same `TopicPartition`. This is the same approach followed by the `Fetch` and `Metadata` paths, that introduced topic Ids in a similar "on the side" way. There's interesting food for thought here anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org