dajac commented on code in PR #14690: URL: https://github.com/apache/kafka/pull/14690#discussion_r1395677889
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -17,25 +17,93 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; /** - * Membership manager that maintains group membership for a single member, following the new - * consumer group protocol. + * Group manager for a single consumer that has a group id defined in the config + * {@link ConsumerConfig#GROUP_ID_CONFIG}, to use the Kafka-based offset management capability, + * and the consumer group protocol to get automatically assigned partitions when calling the + * subscribe API. + * + * <p/> + * + * While the subscribe API hasn't been called (or if the consumer called unsubscribe), this manager + * will only be responsible for keeping the member in the {@link MemberState#UNSUBSCRIBED} state, + * where it can commit offsets to the group identified by the {@link #groupId()}, without joining + * the group. + * + * <p/> + * + * If the consumer subscribe API is called, this manager will use the {@link #groupId()} to join the + * consumer group, and based on the consumer group protocol heartbeats, will handle the full + * lifecycle of the member as it joins the group, reconciles assignments, handles fencing and + * fatal errors, and leaves the group. + * * <p/> - * This is responsible for: - * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li> - * <li>Keeping member state as defined in {@link MemberState}.</li> + * + * Reconciliation process:<p/> + * The member accepts all assignments received from the broker, resolves topic names from + * metadata, reconciles the resolved assignments, and keeps the unresolved to be reconciled when + * discovered with a metadata update. Reconciliations of resolved assignments are executed + * sequentially and acknowledged to the server as they complete. The reconciliation process + * involves multiple async operations, so the member will continue to heartbeat while these + * operations complete, to make sure that the member stays in the group while reconciling. + * * <p/> - * Member info and state are updated based on the heartbeat responses the member receives. + * + * Reconciliation steps: + * <ol> + * <li>Resolve topic names for all topic IDs received in the target assignment. Topic names + * found in metadata are then ready to be reconciled. Topic IDs not found are kept as + * unresolved, and the member request metadata updates until it resolves them (or the broker + * removes it from the target assignment.</li> + * <li>Commit offsets if auto-commit is enabled.</li> + * <li>Invoke the user-defined onPartitionsRevoked listener.</li> + * <li>Invoke the user-defined onPartitionsAssigned listener.</li> + * <li>When the above steps complete, the member acknowledges the reconciled assignment, + * which is the subset of the target that was resolved from metadata and actually reconciled. + * The ack is performed by sending a heartbeat request back to the broker, including the + * reconciled assignment. + * .</li> Review Comment: nit: I suppose that this one should go on the previous line. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); Review Comment: nit: I may be worth logging something here as well to be consistent with `transitionToFatal`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -168,10 +319,37 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { this.memberId = response.memberId(); this.memberEpoch = response.memberEpoch(); ConsumerGroupHeartbeatResponseData.Assignment assignment = response.assignment(); + if (assignment != null) { - setTargetAssignment(assignment); + transitionTo(MemberState.RECONCILING); + replaceUnresolvedAssignmentWithNewAssignment(assignment); + resolveMetadataForUnresolvedAssignment(); + // TODO: improve reconciliation triggering. Initial approach of triggering on every + // HB response and metadata update. Review Comment: I would remove the TODOs for which we have Jiras. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); + } + + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + // TODO: If the member is already part of the group, this should only ensure that the + // updated subscription is included in the next heartbeat request. } + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + registerForMetadataUpdates(); + } + + /** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ + private void registerForMetadataUpdates() { + if (!isRegisteredForMetadataUpdates) { + this.metadata.addClusterUpdateListener(this); + isRegisteredForMetadataUpdates = true; + } + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public CompletableFuture<Void> leaveGroup() { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + // Member is not part of the group. No-op and return completed future to avoid + // unnecessary transitions. + return CompletableFuture.completedFuture(null); + } + + if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { + // Member already leaving. No-op and return existing leave group future that will + // complete when the ongoing leave operation completes. + return leaveGroupInProgress.get(); + } + + transitionTo(MemberState.PREPARE_LEAVING); + leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + // Clear the subscription, no matter if the callback execution failed or succeeded. + subscriptions.assignFromSubscribed(Collections.emptySet()); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); + + // Return future to indicate that the leave group is done when the callbacks + // complete, and the heartbeat to be sent out. (Best effort to send it, without waiting + // for a response or handling timeouts) + return leaveGroupInProgress.get(); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * <ul> + * <li>If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe)</li> + * + * <li>If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced .</li> + * </ul> + * + * @return Future that will complete when the callback execution completes. */ - private boolean maybeTransitionToStable() { - if (!hasPendingTargetAssignment()) { - transitionTo(MemberState.STABLE); + private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<Void> callbackResult; + if (droppedPartitions.isEmpty()) { + // No assignment to release + callbackResult = CompletableFuture.completedFuture(null); } else { - transitionTo(MemberState.RECONCILING); + // Release assignment + if (memberEpoch > 0) { + // Member is part of the group. Invoke onPartitionsRevoked. + callbackResult = revokePartitions(droppedPartitions); + } else { + // Member is not part of the group anymore. Invoke onPartitionsLost. + callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); + } + // Remove all topic IDs and names from local cache + callbackResult.whenComplete((result, error) -> clearPendingAssignmentsAndLocalNamesCache()); } - return state.equals(MemberState.STABLE); + return callbackResult; + } + + /** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#LEAVING} state so that a heartbeat + * request is sent out with it. + */ + private void transitionToSendingLeaveGroup() { + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + currentAssignment = new HashSet<>(); + transitionTo(MemberState.LEAVING); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean shouldHeartbeatNow() { + MemberState state = state(); + return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING; } /** - * Take new target assignment received from the server and set it as targetAssignment to be - * processed. Following the consumer group protocol, the server won't send a new target - * member while a previous one hasn't been acknowledged by the member, so this will fail - * if a target assignment already exists. + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSent() { + MemberState state = state(); + if (state == MemberState.ACKNOWLEDGING) { + if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); + } else { + transitionTo(MemberState.RECONCILING); + } + } else if (state == MemberState.LEAVING) { + transitionTo(MemberState.UNSUBSCRIBED); + leaveGroupInProgress.get().complete(null); + leaveGroupInProgress = Optional.empty(); + } + } + + /** + * @return True if there are no assignments waiting to be resolved from metadata or reconciled. + */ + private boolean allPendingAssignmentsReconciled() { + return assignmentUnresolved.isEmpty() && assignmentReadyToReconcile.isEmpty(); + } + + @Override + public boolean shouldSkipHeartbeat() { + MemberState state = state(); + return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; + } + + /** + * Reconcile the assignment that has been received from the server and for which topic names + * are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed, + * trigger the callbacks and update the subscription state. Note that only one reconciliation + * can be in progress at a time. If there is already another one in progress when this is + * triggered, it will be no-op, and the assignment will be reconciled on the next + * reconciliation loop. + */ + boolean reconcile() { + // Make copy of the assignment to reconcile as it could change as new assignments or metadata updates are received + SortedSet<TopicPartition> assignedPartitions = new TreeSet<>(COMPARATOR); + assignedPartitions.addAll(assignmentReadyToReconcile); + + SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR); + ownedPartitions.addAll(subscriptions.assignedPartitions()); + + boolean sameAssignmentReceived = assignedPartitions.equals(ownedPartitions); + + if (reconciliationInProgress || sameAssignmentReceived) { + String reason; + if (reconciliationInProgress) { + reason = "Another reconciliation is already in progress. Assignment " + + assignmentReadyToReconcile + " will be handled in the next reconciliation loop."; + } else { + reason = "Target assignment ready to reconcile is equals to the member current assignment."; + } + log.debug("Ignoring reconciliation attempt. " + reason); + return false; + } + + markReconciliationInProgress(); + + // Partitions to assign (not previously owned) + SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR); + addedPartitions.addAll(assignedPartitions); + addedPartitions.removeAll(ownedPartitions); + + // Partitions to revoke + SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR); + revokedPartitions.addAll(ownedPartitions); + revokedPartitions.removeAll(assignedPartitions); + + log.info("Updating assignment with\n" + + "\tAssigned partitions: {}\n" + + "\tCurrent owned partitions: {}\n" + + "\tAdded partitions (assigned - owned): {}\n" + + "\tRevoked partitions (owned - assigned): {}\n", + assignedPartitions, + ownedPartitions, + addedPartitions, + revokedPartitions + ); + + CompletableFuture<Void> revocationResult; + if (!revokedPartitions.isEmpty()) { + revocationResult = revokePartitions(revokedPartitions); + } else { + revocationResult = CompletableFuture.completedFuture(null); + // Reschedule the auto commit starting from now (new assignment received without any + // revocation). + commitRequestManager.resetAutoCommitTimer(); + } + + // Future that will complete when the full reconciliation process completes (revocation + // and assignment, executed sequentially) + CompletableFuture<Void> reconciliationResult = + revocationResult.thenCompose(__ -> { + boolean memberHasRejoined = !Objects.equals(memberEpochOnReconciliationStart, + memberEpoch); Review Comment: nit: We could use `==` here now. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); Review Comment: I wonder why we clear it here whereas in transitionToFenced we clear it when the callback future is completed. Is there a reason for this subtile difference? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -17,253 +17,1059 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; -import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class MembershipManagerImplTest { private static final String GROUP_ID = "test-group"; private static final String MEMBER_ID = "test-member-1"; private static final int MEMBER_EPOCH = 1; - private final LogContext logContext = new LogContext(); + + private SubscriptionState subscriptionState; + private ConsumerMetadata metadata; + + private CommitRequestManager commitRequestManager; + + private ConsumerTestBuilder testBuilder; + + @BeforeEach + public void setup() { + testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation()); + metadata = testBuilder.metadata; + subscriptionState = testBuilder.subscriptions; + commitRequestManager = testBuilder.commitRequestManager.get(); + } + + @AfterEach + public void tearDown() { + if (testBuilder != null) { + testBuilder.close(); + } + } + + private MembershipManagerImpl createMembershipManagerJoiningGroup() { + MembershipManagerImpl manager = spy(new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, + metadata, testBuilder.logContext)); + manager.transitionToJoining(); + return manager; + } + + private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupInstanceId, + String serverAssignor) { + MembershipManagerImpl manager = new MembershipManagerImpl( + GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(serverAssignor), + subscriptionState, commitRequestManager, metadata, testBuilder.logContext); + manager.transitionToJoining(); + return manager; + } @Test public void testMembershipManagerServerAssignor() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); assertEquals(Optional.empty(), membershipManager.serverAssignor()); - membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", "Uniform", logContext); + membershipManager = createMembershipManagerJoiningGroup("instance1", "Uniform"); assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor()); } @Test public void testMembershipManagerInitSupportsEmptyGroupInstanceId() { - new MembershipManagerImpl(GROUP_ID, logContext); - new MembershipManagerImpl(GROUP_ID, null, null, logContext); + createMembershipManagerJoiningGroup(); + createMembershipManagerJoiningGroup(null, null); + } + + @Test + public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() { + // First join should register to get metadata updates + MembershipManagerImpl manager = new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, + metadata, testBuilder.logContext); + manager.transitionToJoining(); + verify(metadata).addClusterUpdateListener(manager); + clearInvocations(metadata); + + // Following joins should not register again. + receiveEmptyAssignment(manager); + mockLeaveGroup(); + manager.leaveGroup(); + assertEquals(MemberState.LEAVING, manager.state()); + manager.onHeartbeatRequestSent(); + assertEquals(MemberState.UNSUBSCRIBED, manager.state()); + manager.transitionToJoining(); + verify(metadata, never()).addClusterUpdateListener(manager); + } + + @Test + public void testReconcilingWhenReceivingAssignmentFoundInMetadata() { + MembershipManager membershipManager = mockJoinAndReceiveAssignment(true); + assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + + // When the ack is sent the member should go back to STABLE + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); } @Test public void testTransitionToReconcilingOnlyIfAssignmentReceived() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - assertEquals(MemberState.UNJOINED, membershipManager.state()); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + assertEquals(MemberState.JOINING, membershipManager.state()); ConsumerGroupHeartbeatResponse responseWithoutAssignment = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(responseWithoutAssignment.data()); + membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data()); assertNotEquals(MemberState.RECONCILING, membershipManager.state()); ConsumerGroupHeartbeatResponse responseWithAssignment = - createConsumerGroupHeartbeatResponse(createAssignment()); - membershipManager.updateState(responseWithAssignment.data()); + createConsumerGroupHeartbeatResponse(createAssignment(true)); + membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } @Test public void testMemberIdAndEpochResetOnFencedMembers() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponse heartbeatResponse = - createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); + mockMemberHasAutoAssignedPartition(); + membershipManager.transitionToFenced(); - assertFalse(membershipManager.memberId().isEmpty()); + assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(0, membershipManager.memberEpoch()); } @Test - public void testTransitionToFailure() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + public void testTransitionToFatal() { + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); - membershipManager.transitionToFailed(); - assertEquals(MemberState.FAILED, membershipManager.state()); + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + assertEquals(MemberState.FATAL, membershipManager.state()); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testTransitionToFailedWhenTryingToJoin() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, metadata, + testBuilder.logContext); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + membershipManager.transitionToJoining(); + + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + assertEquals(MemberState.FATAL, membershipManager.state()); } @Test public void testFencingWhenStateIsStable() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + MembershipManager membershipManager = createMemberInStableState(); + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testFencingWhenStateIsReconciling() { + MembershipManager membershipManager = mockJoinAndReceiveAssignment(false); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + /** + * This is the case where a member is stuck reconciling and transition out of the RECONCILING + * state (due to failure). When the reconciliation completes it should not be applied because + * it is not relevant anymore (it should not update the assignment on the member or send ack). + */ + @Test + public void testDelayedReconciliationResultDiscardedIfMemberNotInReconcilingStateAnymore() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + Set<TopicPartition> owned = Collections.singleton(new TopicPartition(topic1, 0)); + mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true); + + // Reconciliation that does not complete stuck on revocation commit. + CompletableFuture<Void> commitResult = mockEmptyAssignmentAndRevocationStuckOnCommit(membershipManager); + + // Member received fatal error while reconciling + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + clearInvocations(subscriptionState); + + // Complete commit request + commitResult.complete(null); + + // Member should not update the subscription or send ack when the delayed reconciliation + // completed. + verify(subscriptionState, never()).assignFromSubscribed(anySet()); + assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + } + + /** + * This is the case where a member is stuck reconciling an assignment A (waiting on + * metadata, commit or callbacks), and it rejoins (due to fence or unsubscribe/subscribe). If + * the reconciliation of A completes it should not be applied (it should not update the + * assignment on the member or send ack). + */ + @Test + public void testDelayedReconciliationResultDiscardedIfMemberRejoins() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + Set<TopicPartition> owned = Collections.singleton(new TopicPartition(topic1, 0)); + mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true); + + // Reconciliation that does not complete stuck on revocation commit. + CompletableFuture<Void> commitResult = + mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, topicId1, topic1, + Arrays.asList(1, 2), true); + Set<TopicPartition> assignment1 = new HashSet<>(); + assignment1.add(new TopicPartition(topic1, 1)); + assignment1.add(new TopicPartition(topic1, 2)); + assertEquals(assignment1, membershipManager.assignmentReadyToReconcile()); + int currentEpoch = membershipManager.memberEpoch(); + + // Get fenced and rejoin while still reconciling. Get new assignment to reconcile after + // rejoining. + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + clearInvocations(subscriptionState); + + // Get new assignment A2 after rejoining. This should not trigger a reconciliation just + // yet because there is another on in progress, but should keep the new assignment ready + // to be reconciled next. + Uuid topicId3 = Uuid.randomUuid(); + mockOwnedPartitionAndAssignmentReceived(topicId3, "topic3", owned, true); + receiveAssignmentAfterRejoin(topicId3, Collections.singletonList(5), membershipManager); + verifyReconciliationNotTriggered(membershipManager); + Set<TopicPartition> assignmentAfterRejoin = Collections.singleton(new TopicPartition("topic3", 5)); + assertEquals(assignmentAfterRejoin, membershipManager.assignmentReadyToReconcile()); + + // Reconciliation completes when the member has already re-joined the group. Should not + // update the subscription state or send ack. + commitResult.complete(null); + verify(subscriptionState, never()).assignFromSubscribed(anyCollection()); + assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + + // Assignment received after rejoining should be ready to reconcile on the next + // reconciliation loop. + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(assignmentAfterRejoin, membershipManager.assignmentReadyToReconcile()); + } + + /** + * This is the case where a member is stuck reconciling an assignment A (waiting on + * metadata, commit or callbacks), and the target assignment changes (due to new topics added + * to metadata, or new assignment received from broker). If the reconciliation of A completes + * t should be applied (should update the assignment on the member and send ack), and then + * the reconciliation of assignment B will be processed and applied in the next + * reconciliation loop. + */ + @Test + public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataUpdate() { + // Member receives and reconciles topic1-partition0 + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + MembershipManagerImpl membershipManager = + mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, topic1, Arrays.asList(0)); + membershipManager.onHeartbeatRequestSent(); assertEquals(MemberState.STABLE, membershipManager.state()); + clearInvocations(membershipManager, subscriptionState); + when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(new TopicPartition(topic1, 0))); + + // New assignment revoking the partitions owned and adding a new one (not in metadata). + // Reconciliation triggered for topic 1 (stuck on revocation commit) and topic2 waiting + // for metadata. + Uuid topicId2 = Uuid.randomUuid(); + String topic2 = "topic2"; + CompletableFuture<Void> commitResult = + mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, topicId2, topic2, + Arrays.asList(1, 2), false); + verify(metadata).requestUpdate(anyBoolean()); + assertEquals(Collections.singleton(topicId2), membershipManager.topicsWaitingForMetadata()); - testStateUpdateOnFenceError(membershipManager); + // Metadata discovered for topic2 while reconciliation in progress to revoke topic1. + // Should not trigger a new reconciliation because there is one already in progress. + mockTopicNameInMetadataCache(Collections.singletonMap(topicId2, topic2), true); + membershipManager.onUpdate(null); + assertEquals(Collections.emptySet(), membershipManager.topicsWaitingForMetadata()); + verifyReconciliationNotTriggered(membershipManager); + + // Reconciliation in progress completes. Should be applied revoking topic 1 only. Newly + // discovered topic2 will be reconciled in the next reconciliation loop. + commitResult.complete(null); + + // Member should update the subscription and send ack when the delayed reconciliation + // completes. + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + + // Pending assignment that was discovered in metadata should be ready to reconcile in the + // next reconciliation loop. + Set<TopicPartition> topic2Assignment = new HashSet<>(Arrays.asList( + new TopicPartition(topic2, 1), + new TopicPartition(topic2, 2))); + assertEquals(topic2Assignment, membershipManager.assignmentReadyToReconcile()); + assertEquals(topic2Assignment, membershipManager.assignmentReadyToReconcile()); } @Test - public void testFencingWhenStateIsReconciling() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(createAssignment()); - membershipManager.updateState(heartbeatResponse.data()); + public void testLeaveGroupWhenStateIsStable() { + MembershipManager membershipManager = createMemberInStableState(); + testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testLeaveGroupWhenStateIsReconciling() { + MembershipManager membershipManager = mockJoinAndReceiveAssignment(false); assertEquals(MemberState.RECONCILING, membershipManager.state()); - testStateUpdateOnFenceError(membershipManager); + testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager); + } + + @Test + public void testLeaveGroupWhenMemberAlreadyLeaving() { + MembershipManager membershipManager = createMemberInStableState(); + + // First leave attempt. Should trigger the callbacks and stay LEAVING until + // callbacks complete and the heartbeat is sent out. + mockLeaveGroup(); + CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup(); + assertFalse(leaveResult1.isDone()); + assertEquals(MemberState.LEAVING, membershipManager.state()); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + clearInvocations(subscriptionState); + + // Second leave attempt while the first one has not completed yet. Should not + // trigger any callbacks, and return a future that will complete when the ongoing first + // leave operation completes. + mockLeaveGroup(); + CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup(); + verify(subscriptionState, never()).rebalanceListener(); + assertFalse(leaveResult2.isDone()); + + // Complete first leave group operation. Should also complete the second leave group. + membershipManager.onHeartbeatRequestSent(); + assertTrue(leaveResult1.isDone()); + assertFalse(leaveResult1.isCompletedExceptionally()); + assertTrue(leaveResult2.isDone()); + assertFalse(leaveResult2.isCompletedExceptionally()); + + // Subscription should have been updated only once with the first leave group. + verify(subscriptionState, never()).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testLeaveGroupWhenMemberAlreadyLeft() { + MembershipManager membershipManager = createMemberInStableState(); + + // Leave group triggered and completed + mockLeaveGroup(); + CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup(); + assertEquals(MemberState.LEAVING, membershipManager.state()); + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + assertTrue(leaveResult1.isDone()); + assertFalse(leaveResult1.isCompletedExceptionally()); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + clearInvocations(subscriptionState); + + // Call to leave group again, when member already left. Should be no-op (no callbacks, + // no assignment updated) + mockLeaveGroup(); + CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup(); + assertTrue(leaveResult2.isDone()); + assertFalse(leaveResult2.isCompletedExceptionally()); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + verify(subscriptionState, never()).rebalanceListener(); + verify(subscriptionState, never()).assignFromSubscribed(Collections.emptySet()); } @Test public void testFatalFailureWhenStateIsUnjoined() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - assertEquals(MemberState.UNJOINED, membershipManager.state()); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + assertEquals(MemberState.JOINING, membershipManager.state()); testStateUpdateOnFatalFailure(membershipManager); } @Test public void testFatalFailureWhenStateIsStable() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); testStateUpdateOnFatalFailure(membershipManager); } - @Test - public void testFencingShouldNotHappenWhenStateIsUnjoined() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - assertEquals(MemberState.UNJOINED, membershipManager.state()); - - // Getting fenced when the member is not part of the group is not expected and should - // fail with invalid transition. - assertThrows(IllegalStateException.class, membershipManager::transitionToFenced); - } - @Test public void testUpdateStateFailsOnResponsesWithErrors() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); // Updating state with a heartbeat response containing errors cannot be performed and // should fail. ConsumerGroupHeartbeatResponse unknownMemberResponse = createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID); assertThrows(IllegalArgumentException.class, - () -> membershipManager.updateState(unknownMemberResponse.data())); + () -> membershipManager.onHeartbeatResponseReceived(unknownMemberResponse.data())); } + /** + * This test should be the case when an assignment is sent to the member, and it cannot find + * it in metadata (permanently, ex. topic deleted). The member will keep the assignment as + * waiting for metadata, but the following assignment received from the broker will not + * contain the deleted topic. The member will discard the assignment that was pending and + * proceed with the reconciliation of the new assignment. + */ @Test - public void testAssignmentUpdatedAsReceivedAndProcessed() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponseData.Assignment newAssignment = createAssignment(); - ConsumerGroupHeartbeatResponse heartbeatResponse = - createConsumerGroupHeartbeatResponse(newAssignment); - membershipManager.updateState(heartbeatResponse.data()); + public void testNewAssignmentReplacesPreviousOneWaitingOnMetadata() { + MembershipManagerImpl membershipManager = mockJoinAndReceiveAssignment(false); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + // When the ack is sent the member should go back to RECONCILING + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0); + + // New target assignment received while there is another one waiting to be resolved + // and reconciled. This assignment does not include the previous one that is waiting + // for metadata, so the member will discard the topics that were waiting for metadata, and + // reconcile the new assignment. + Uuid topicId = Uuid.randomUuid(); + String topicName = "topic1"; + when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, topicName)); + receiveAssignment(topicId, Collections.singletonList(0), membershipManager); + Set<TopicPartition> expectedAssignment = Collections.singleton(new TopicPartition(topicName, 0)); + assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + verify(subscriptionState).assignFromSubscribed(expectedAssignment); + + // When ack for the reconciled assignment is sent, member should go back to STABLE + // because the first assignment that was not resolved should have been discarded + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); + assertTrue(membershipManager.topicsWaitingForMetadata().isEmpty()); + } + + /** + * This test should be the case when an assignment is sent to the member, and it cannot find + * it in metadata (temporarily). The broker will continue to send the assignment to the + * member. The member should keep it was waiting for metadata and continue to request updates. Review Comment: `The broker will continue to send the assignment to the member.` This is not entirely true. The broker may not send anything. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); Review Comment: Is there a reason why we don't do this as the first thing in this method? This would be more consistent with transitionToFenced. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); + } + + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + // TODO: If the member is already part of the group, this should only ensure that the + // updated subscription is included in the next heartbeat request. } + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + registerForMetadataUpdates(); + } + + /** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ + private void registerForMetadataUpdates() { + if (!isRegisteredForMetadataUpdates) { + this.metadata.addClusterUpdateListener(this); + isRegisteredForMetadataUpdates = true; + } + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public CompletableFuture<Void> leaveGroup() { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + // Member is not part of the group. No-op and return completed future to avoid + // unnecessary transitions. + return CompletableFuture.completedFuture(null); + } + + if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { + // Member already leaving. No-op and return existing leave group future that will + // complete when the ongoing leave operation completes. + return leaveGroupInProgress.get(); + } + + transitionTo(MemberState.PREPARE_LEAVING); + leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + // Clear the subscription, no matter if the callback execution failed or succeeded. + subscriptions.assignFromSubscribed(Collections.emptySet()); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); + + // Return future to indicate that the leave group is done when the callbacks + // complete, and the heartbeat to be sent out. (Best effort to send it, without waiting + // for a response or handling timeouts) + return leaveGroupInProgress.get(); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * <ul> + * <li>If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe)</li> + * + * <li>If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced .</li> + * </ul> + * + * @return Future that will complete when the callback execution completes. */ - private boolean maybeTransitionToStable() { - if (!hasPendingTargetAssignment()) { - transitionTo(MemberState.STABLE); + private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<Void> callbackResult; + if (droppedPartitions.isEmpty()) { + // No assignment to release + callbackResult = CompletableFuture.completedFuture(null); } else { - transitionTo(MemberState.RECONCILING); + // Release assignment + if (memberEpoch > 0) { + // Member is part of the group. Invoke onPartitionsRevoked. + callbackResult = revokePartitions(droppedPartitions); + } else { + // Member is not part of the group anymore. Invoke onPartitionsLost. + callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); + } + // Remove all topic IDs and names from local cache + callbackResult.whenComplete((result, error) -> clearPendingAssignmentsAndLocalNamesCache()); Review Comment: I am a bit confuse by where we call `clearPendingAssignmentsAndLocalNamesCache`. Sometime we call it when the callback future is completed, sometime right after scheduling the callback. I wonder if it would be possible to be more consistent or if there are specific reasons that I did not get. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); Review Comment: nit: Should we also log the member epoch? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; Review Comment: Is this still valid? It looks like we call onPartitionsLost explicitly now. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); + } + + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + // TODO: If the member is already part of the group, this should only ensure that the + // updated subscription is included in the next heartbeat request. } + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + registerForMetadataUpdates(); + } + + /** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ + private void registerForMetadataUpdates() { + if (!isRegisteredForMetadataUpdates) { + this.metadata.addClusterUpdateListener(this); + isRegisteredForMetadataUpdates = true; + } + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public CompletableFuture<Void> leaveGroup() { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + // Member is not part of the group. No-op and return completed future to avoid + // unnecessary transitions. + return CompletableFuture.completedFuture(null); + } + + if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { + // Member already leaving. No-op and return existing leave group future that will + // complete when the ongoing leave operation completes. + return leaveGroupInProgress.get(); + } + + transitionTo(MemberState.PREPARE_LEAVING); + leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + // Clear the subscription, no matter if the callback execution failed or succeeded. + subscriptions.assignFromSubscribed(Collections.emptySet()); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); Review Comment: Note that here we clear the `assignedTopicNamesCache` but we only clear the assigned partitions after the callback is executed. This means that we have a period of time during which we are not able to resolve ids from the names in the subscription. I suppose that it does not matter in this case but this could be a source of subtile bugs. As we discussed offline, I think that we really need to update the subscriptions to use TopicIdPartitions. If this is not possible, an intermediate approach would be to keep the assigned TopicIdPartitions in this manager and to update the subscriptions with `subscriptions.assignFromSubscribed` when we update it. Or, we could also move the bookkeeping of the cache closer to calls to `subscriptions.assignFromSubscribed`. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); + } + + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + // TODO: If the member is already part of the group, this should only ensure that the + // updated subscription is included in the next heartbeat request. } + /** + * Transition to the {@link MemberState#JOINING} state, indicating that the member will + * try to join the group on the next heartbeat request. This is expected to be invoked when + * the user calls the subscribe API, or when the member wants to rejoin after getting fenced. + * Visible for testing. + */ + void transitionToJoining() { + if (state == MemberState.FATAL) { + log.warn("No action taken to join the group with the updated subscription because " + + "the member is in FATAL state"); + return; + } + resetEpoch(); + transitionTo(MemberState.JOINING); + clearPendingAssignmentsAndLocalNamesCache(); + registerForMetadataUpdates(); + } + + /** + * Register to get notified when the cluster metadata is updated, via the + * {@link #onUpdate(ClusterResource)}. Register only if the manager is not register already. + */ + private void registerForMetadataUpdates() { + if (!isRegisteredForMetadataUpdates) { + this.metadata.addClusterUpdateListener(this); + isRegisteredForMetadataUpdates = true; + } + } + + /** + * {@inheritDoc} + */ @Override - public boolean shouldSendHeartbeat() { - return state() != MemberState.FAILED; + public CompletableFuture<Void> leaveGroup() { + if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) { + // Member is not part of the group. No-op and return completed future to avoid + // unnecessary transitions. + return CompletableFuture.completedFuture(null); + } + + if (state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING) { + // Member already leaving. No-op and return existing leave group future that will + // complete when the ongoing leave operation completes. + return leaveGroupInProgress.get(); + } + + transitionTo(MemberState.PREPARE_LEAVING); + leaveGroupInProgress = Optional.of(new CompletableFuture<>()); + + CompletableFuture<Void> callbackResult = invokeOnPartitionsRevokedOrLostToReleaseAssignment(); + callbackResult.whenComplete((result, error) -> { + // Clear the subscription, no matter if the callback execution failed or succeeded. + subscriptions.assignFromSubscribed(Collections.emptySet()); + + // Transition to ensure that a heartbeat request is sent out to effectively leave the + // group (even in the case where the member had no assignment to release or when the + // callback execution failed.) + transitionToSendingLeaveGroup(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); + + // Return future to indicate that the leave group is done when the callbacks + // complete, and the heartbeat to be sent out. (Best effort to send it, without waiting + // for a response or handling timeouts) + return leaveGroupInProgress.get(); } /** - * Transition to {@link MemberState#STABLE} only if there are no target assignments left to - * reconcile. Transition to {@link MemberState#RECONCILING} otherwise. + * Release member assignment by calling the user defined callbacks for onPartitionsRevoked or + * onPartitionsLost. + * <ul> + * <li>If the member is part of the group (epoch > 0), this will invoke onPartitionsRevoked. + * This will be the case when releasing assignment because the member is intentionally + * leaving the group (after a call to unsubscribe)</li> + * + * <li>If the member is not part of the group (epoch <=0), this will invoke onPartitionsLost. + * This will be the case when releasing assignment after being fenced .</li> + * </ul> + * + * @return Future that will complete when the callback execution completes. */ - private boolean maybeTransitionToStable() { - if (!hasPendingTargetAssignment()) { - transitionTo(MemberState.STABLE); + private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() { + SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(COMPARATOR); + droppedPartitions.addAll(subscriptions.assignedPartitions()); + + CompletableFuture<Void> callbackResult; + if (droppedPartitions.isEmpty()) { + // No assignment to release + callbackResult = CompletableFuture.completedFuture(null); } else { - transitionTo(MemberState.RECONCILING); + // Release assignment + if (memberEpoch > 0) { + // Member is part of the group. Invoke onPartitionsRevoked. + callbackResult = revokePartitions(droppedPartitions); + } else { + // Member is not part of the group anymore. Invoke onPartitionsLost. + callbackResult = invokeOnPartitionsLostCallback(droppedPartitions); + } + // Remove all topic IDs and names from local cache + callbackResult.whenComplete((result, error) -> clearPendingAssignmentsAndLocalNamesCache()); } - return state.equals(MemberState.STABLE); + return callbackResult; + } + + /** + * Reset member epoch to the value required for the leave the group heartbeat request, and + * transition to the {@link MemberState#LEAVING} state so that a heartbeat + * request is sent out with it. + */ + private void transitionToSendingLeaveGroup() { + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + currentAssignment = new HashSet<>(); + transitionTo(MemberState.LEAVING); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean shouldHeartbeatNow() { + MemberState state = state(); + return state == MemberState.ACKNOWLEDGING || state == MemberState.LEAVING; } /** - * Take new target assignment received from the server and set it as targetAssignment to be - * processed. Following the consumer group protocol, the server won't send a new target - * member while a previous one hasn't been acknowledged by the member, so this will fail - * if a target assignment already exists. + * {@inheritDoc} + */ + @Override + public void onHeartbeatRequestSent() { + MemberState state = state(); + if (state == MemberState.ACKNOWLEDGING) { + if (allPendingAssignmentsReconciled()) { + transitionTo(MemberState.STABLE); + } else { + transitionTo(MemberState.RECONCILING); + } + } else if (state == MemberState.LEAVING) { + transitionTo(MemberState.UNSUBSCRIBED); + leaveGroupInProgress.get().complete(null); + leaveGroupInProgress = Optional.empty(); + } + } + + /** + * @return True if there are no assignments waiting to be resolved from metadata or reconciled. + */ + private boolean allPendingAssignmentsReconciled() { + return assignmentUnresolved.isEmpty() && assignmentReadyToReconcile.isEmpty(); + } + + @Override + public boolean shouldSkipHeartbeat() { + MemberState state = state(); + return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; + } + + /** + * Reconcile the assignment that has been received from the server and for which topic names + * are resolved, kept in the {@link #assignmentReadyToReconcile}. This will commit if needed, + * trigger the callbacks and update the subscription state. Note that only one reconciliation + * can be in progress at a time. If there is already another one in progress when this is + * triggered, it will be no-op, and the assignment will be reconciled on the next + * reconciliation loop. + */ + boolean reconcile() { + // Make copy of the assignment to reconcile as it could change as new assignments or metadata updates are received + SortedSet<TopicPartition> assignedPartitions = new TreeSet<>(COMPARATOR); + assignedPartitions.addAll(assignmentReadyToReconcile); + + SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR); + ownedPartitions.addAll(subscriptions.assignedPartitions()); + + boolean sameAssignmentReceived = assignedPartitions.equals(ownedPartitions); + + if (reconciliationInProgress || sameAssignmentReceived) { + String reason; + if (reconciliationInProgress) { + reason = "Another reconciliation is already in progress. Assignment " + + assignmentReadyToReconcile + " will be handled in the next reconciliation loop."; + } else { + reason = "Target assignment ready to reconcile is equals to the member current assignment."; + } + log.debug("Ignoring reconciliation attempt. " + reason); + return false; + } + + markReconciliationInProgress(); + + // Partitions to assign (not previously owned) + SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR); + addedPartitions.addAll(assignedPartitions); + addedPartitions.removeAll(ownedPartitions); + + // Partitions to revoke + SortedSet<TopicPartition> revokedPartitions = new TreeSet<>(COMPARATOR); + revokedPartitions.addAll(ownedPartitions); + revokedPartitions.removeAll(assignedPartitions); + + log.info("Updating assignment with\n" + + "\tAssigned partitions: {}\n" + + "\tCurrent owned partitions: {}\n" + + "\tAdded partitions (assigned - owned): {}\n" + + "\tRevoked partitions (owned - assigned): {}\n", + assignedPartitions, + ownedPartitions, + addedPartitions, + revokedPartitions + ); + + CompletableFuture<Void> revocationResult; + if (!revokedPartitions.isEmpty()) { + revocationResult = revokePartitions(revokedPartitions); + } else { + revocationResult = CompletableFuture.completedFuture(null); + // Reschedule the auto commit starting from now (new assignment received without any + // revocation). + commitRequestManager.resetAutoCommitTimer(); + } + + // Future that will complete when the full reconciliation process completes (revocation + // and assignment, executed sequentially) + CompletableFuture<Void> reconciliationResult = + revocationResult.thenCompose(__ -> { + boolean memberHasRejoined = !Objects.equals(memberEpochOnReconciliationStart, + memberEpoch); + if (state == MemberState.RECONCILING && !memberHasRejoined) { + // Apply assignment + CompletableFuture<Void> assignResult = assignPartitions(assignedPartitions, addedPartitions); + + // Clear topic names cache only for topics that are not in the subscription anymore + for (TopicPartition tp : revokedPartitions) { + if (!subscriptions.subscription().contains(tp.topic())) { + assignedTopicNamesCache.values().remove(tp.topic()); + } + } + return assignResult; + } else { + String reason; + if (state != MemberState.RECONCILING) { + reason = "The member already transitioned out of the reconciling " + + "state into " + state; + } else { + reason = "The member has re-joined the group."; + } + // Revocation callback completed but the reconciled assignment should not + // be applied (not relevant anymore). This could be because the member + // is not in the RECONCILING state anymore (fenced, failed, unsubscribed), + // or because it has already re-joined the group. + CompletableFuture<Void> res = new CompletableFuture<>(); + res.completeExceptionally(new KafkaException("Interrupting reconciliation" + + " after revocation. " + reason)); + return res; + } + }); + + reconciliationResult.whenComplete((result, error) -> { + markReconciliationCompleted(); + if (error != null) { + // Leaving member in RECONCILING state after callbacks fail. The member + // won't send the ack, and the expectation is that the broker will kick the + // member out of the group after the rebalance timeout expires, leading to a + // RECONCILING -> FENCED transition. + log.error("Reconciliation failed.", error); + } else { + if (state == MemberState.RECONCILING) { Review Comment: Do we need to check the epoch here as well? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ########## @@ -181,63 +359,592 @@ public void updateState(ConsumerGroupHeartbeatResponseData response) { public void transitionToFenced() { resetEpoch(); transitionTo(MemberState.FENCED); + + // Release assignment + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member got fenced. Member will rejoin the group anyways.", error); + } + subscriptions.assignFromSubscribed(Collections.emptySet()); + transitionToJoining(); + }); + + clearPendingAssignmentsAndLocalNamesCache(); } /** * {@inheritDoc} */ @Override - public void transitionToFailed() { - log.error("Member {} transitioned to {} state", memberId, MemberState.FAILED); - transitionTo(MemberState.FAILED); + public void transitionToFatal() { + log.error("Member {} transitioned to {} state", memberId, MemberState.FATAL); + + // Update epoch to indicate that the member is not in the group anymore, so that the + // onPartitionsLost is called to release assignment. + memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; + CompletableFuture<Void> callbackResult = invokeOnPartitionsLostCallback(subscriptions.assignedPartitions()); + callbackResult.whenComplete((result, error) -> { + if (error != null) { + log.error("onPartitionsLost callback invocation failed while releasing assignment" + + "after member failed with fatal error.", error); + } + }); + subscriptions.assignFromSubscribed(Collections.emptySet()); + clearPendingAssignmentsAndLocalNamesCache(); + transitionTo(MemberState.FATAL); + } + + /** + * {@inheritDoc} + */ + public void onSubscriptionUpdated() { + if (state == MemberState.UNSUBSCRIBED) { + transitionToJoining(); + } + // TODO: If the member is already part of the group, this should only ensure that the + // updated subscription is included in the next heartbeat request. Review Comment: I think that the next HB should pick it up. Otherwise, we could perhaps transition to Joining to force an immediate HB. I am not sure that it is worth it. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java: ########## @@ -17,253 +17,1059 @@ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; -import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anySet; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class MembershipManagerImplTest { private static final String GROUP_ID = "test-group"; private static final String MEMBER_ID = "test-member-1"; private static final int MEMBER_EPOCH = 1; - private final LogContext logContext = new LogContext(); + + private SubscriptionState subscriptionState; + private ConsumerMetadata metadata; + + private CommitRequestManager commitRequestManager; + + private ConsumerTestBuilder testBuilder; + + @BeforeEach + public void setup() { + testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation()); + metadata = testBuilder.metadata; + subscriptionState = testBuilder.subscriptions; + commitRequestManager = testBuilder.commitRequestManager.get(); + } + + @AfterEach + public void tearDown() { + if (testBuilder != null) { + testBuilder.close(); + } + } + + private MembershipManagerImpl createMembershipManagerJoiningGroup() { + MembershipManagerImpl manager = spy(new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, + metadata, testBuilder.logContext)); + manager.transitionToJoining(); + return manager; + } + + private MembershipManagerImpl createMembershipManagerJoiningGroup(String groupInstanceId, + String serverAssignor) { + MembershipManagerImpl manager = new MembershipManagerImpl( + GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(serverAssignor), + subscriptionState, commitRequestManager, metadata, testBuilder.logContext); + manager.transitionToJoining(); + return manager; + } @Test public void testMembershipManagerServerAssignor() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); assertEquals(Optional.empty(), membershipManager.serverAssignor()); - membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", "Uniform", logContext); + membershipManager = createMembershipManagerJoiningGroup("instance1", "Uniform"); assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor()); } @Test public void testMembershipManagerInitSupportsEmptyGroupInstanceId() { - new MembershipManagerImpl(GROUP_ID, logContext); - new MembershipManagerImpl(GROUP_ID, null, null, logContext); + createMembershipManagerJoiningGroup(); + createMembershipManagerJoiningGroup(null, null); + } + + @Test + public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() { + // First join should register to get metadata updates + MembershipManagerImpl manager = new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, + metadata, testBuilder.logContext); + manager.transitionToJoining(); + verify(metadata).addClusterUpdateListener(manager); + clearInvocations(metadata); + + // Following joins should not register again. + receiveEmptyAssignment(manager); + mockLeaveGroup(); + manager.leaveGroup(); + assertEquals(MemberState.LEAVING, manager.state()); + manager.onHeartbeatRequestSent(); + assertEquals(MemberState.UNSUBSCRIBED, manager.state()); + manager.transitionToJoining(); + verify(metadata, never()).addClusterUpdateListener(manager); + } + + @Test + public void testReconcilingWhenReceivingAssignmentFoundInMetadata() { + MembershipManager membershipManager = mockJoinAndReceiveAssignment(true); + assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + + // When the ack is sent the member should go back to STABLE + membershipManager.onHeartbeatRequestSent(); + assertEquals(MemberState.STABLE, membershipManager.state()); } @Test public void testTransitionToReconcilingOnlyIfAssignmentReceived() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - assertEquals(MemberState.UNJOINED, membershipManager.state()); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + assertEquals(MemberState.JOINING, membershipManager.state()); ConsumerGroupHeartbeatResponse responseWithoutAssignment = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(responseWithoutAssignment.data()); + membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data()); assertNotEquals(MemberState.RECONCILING, membershipManager.state()); ConsumerGroupHeartbeatResponse responseWithAssignment = - createConsumerGroupHeartbeatResponse(createAssignment()); - membershipManager.updateState(responseWithAssignment.data()); + createConsumerGroupHeartbeatResponse(createAssignment(true)); + membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data()); assertEquals(MemberState.RECONCILING, membershipManager.state()); } @Test public void testMemberIdAndEpochResetOnFencedMembers() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponse heartbeatResponse = - createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); + ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); + mockMemberHasAutoAssignedPartition(); + membershipManager.transitionToFenced(); - assertFalse(membershipManager.memberId().isEmpty()); + assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(0, membershipManager.memberEpoch()); } @Test - public void testTransitionToFailure() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); + public void testTransitionToFatal() { + MembershipManagerImpl membershipManager = createMembershipManagerJoiningGroup(); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data()); assertEquals(MemberState.STABLE, membershipManager.state()); assertEquals(MEMBER_ID, membershipManager.memberId()); assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch()); - membershipManager.transitionToFailed(); - assertEquals(MemberState.FAILED, membershipManager.state()); + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + assertEquals(MemberState.FATAL, membershipManager.state()); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testTransitionToFailedWhenTryingToJoin() { + MembershipManagerImpl membershipManager = new MembershipManagerImpl( + GROUP_ID, subscriptionState, commitRequestManager, metadata, + testBuilder.logContext); + assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + membershipManager.transitionToJoining(); + + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + assertEquals(MemberState.FATAL, membershipManager.state()); } @Test public void testFencingWhenStateIsStable() { - MembershipManagerImpl membershipManager = new MembershipManagerImpl(GROUP_ID, logContext); - ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(null); - membershipManager.updateState(heartbeatResponse.data()); + MembershipManager membershipManager = createMemberInStableState(); + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + @Test + public void testFencingWhenStateIsReconciling() { + MembershipManager membershipManager = mockJoinAndReceiveAssignment(false); + assertEquals(MemberState.RECONCILING, membershipManager.state()); + + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + } + + /** + * This is the case where a member is stuck reconciling and transition out of the RECONCILING + * state (due to failure). When the reconciliation completes it should not be applied because + * it is not relevant anymore (it should not update the assignment on the member or send ack). + */ + @Test + public void testDelayedReconciliationResultDiscardedIfMemberNotInReconcilingStateAnymore() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + Set<TopicPartition> owned = Collections.singleton(new TopicPartition(topic1, 0)); + mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true); + + // Reconciliation that does not complete stuck on revocation commit. + CompletableFuture<Void> commitResult = mockEmptyAssignmentAndRevocationStuckOnCommit(membershipManager); + + // Member received fatal error while reconciling + when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); + membershipManager.transitionToFatal(); + verify(subscriptionState).assignFromSubscribed(Collections.emptySet()); + clearInvocations(subscriptionState); + + // Complete commit request + commitResult.complete(null); + + // Member should not update the subscription or send ack when the delayed reconciliation + // completed. + verify(subscriptionState, never()).assignFromSubscribed(anySet()); + assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + } + + /** + * This is the case where a member is stuck reconciling an assignment A (waiting on + * metadata, commit or callbacks), and it rejoins (due to fence or unsubscribe/subscribe). If + * the reconciliation of A completes it should not be applied (it should not update the + * assignment on the member or send ack). + */ + @Test + public void testDelayedReconciliationResultDiscardedIfMemberRejoins() { + MembershipManagerImpl membershipManager = createMemberInStableState(); + Uuid topicId1 = Uuid.randomUuid(); + String topic1 = "topic1"; + Set<TopicPartition> owned = Collections.singleton(new TopicPartition(topic1, 0)); + mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true); + + // Reconciliation that does not complete stuck on revocation commit. + CompletableFuture<Void> commitResult = + mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, topicId1, topic1, + Arrays.asList(1, 2), true); + Set<TopicPartition> assignment1 = new HashSet<>(); + assignment1.add(new TopicPartition(topic1, 1)); + assignment1.add(new TopicPartition(topic1, 2)); + assertEquals(assignment1, membershipManager.assignmentReadyToReconcile()); + int currentEpoch = membershipManager.memberEpoch(); + + // Get fenced and rejoin while still reconciling. Get new assignment to reconcile after + // rejoining. + testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager); + clearInvocations(subscriptionState); + + // Get new assignment A2 after rejoining. This should not trigger a reconciliation just + // yet because there is another on in progress, but should keep the new assignment ready + // to be reconciled next. + Uuid topicId3 = Uuid.randomUuid(); + mockOwnedPartitionAndAssignmentReceived(topicId3, "topic3", owned, true); + receiveAssignmentAfterRejoin(topicId3, Collections.singletonList(5), membershipManager); + verifyReconciliationNotTriggered(membershipManager); + Set<TopicPartition> assignmentAfterRejoin = Collections.singleton(new TopicPartition("topic3", 5)); + assertEquals(assignmentAfterRejoin, membershipManager.assignmentReadyToReconcile()); + + // Reconciliation completes when the member has already re-joined the group. Should not + // update the subscription state or send ack. + commitResult.complete(null); + verify(subscriptionState, never()).assignFromSubscribed(anyCollection()); + assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state()); + + // Assignment received after rejoining should be ready to reconcile on the next + // reconciliation loop. + assertEquals(MemberState.RECONCILING, membershipManager.state()); + assertEquals(assignmentAfterRejoin, membershipManager.assignmentReadyToReconcile()); + } + + /** + * This is the case where a member is stuck reconciling an assignment A (waiting on + * metadata, commit or callbacks), and the target assignment changes (due to new topics added + * to metadata, or new assignment received from broker). If the reconciliation of A completes + * t should be applied (should update the assignment on the member and send ack), and then Review Comment: nit: `t` -> `it`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org