dajac commented on code in PR #14690:
URL: https://github.com/apache/kafka/pull/14690#discussion_r1395677889


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -17,25 +17,93 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import 
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 
 /**
- * Membership manager that maintains group membership for a single member, 
following the new
- * consumer group protocol.
+ * Group manager for a single consumer that has a group id defined in the 
config
+ * {@link ConsumerConfig#GROUP_ID_CONFIG}, to use the Kafka-based offset 
management capability,
+ * and the consumer group protocol to get automatically assigned partitions 
when calling the
+ * subscribe API.
+ *
+ * <p/>
+ *
+ * While the subscribe API hasn't been called (or if the consumer called 
unsubscribe), this manager
+ * will only be responsible for keeping the member in the {@link 
MemberState#UNSUBSCRIBED} state,
+ * where it can commit offsets to the group identified by the {@link 
#groupId()}, without joining
+ * the group.
+ *
+ * <p/>
+ *
+ * If the consumer subscribe API is called, this manager will use the {@link 
#groupId()} to join the
+ * consumer group, and based on the consumer group protocol heartbeats, will 
handle the full
+ * lifecycle of the member as it joins the group, reconciles assignments, 
handles fencing and
+ * fatal errors, and leaves the group.
+ *
  * <p/>
- * This is responsible for:
- * <li>Keeping member info (ex. member id, member epoch, assignment, etc.)</li>
- * <li>Keeping member state as defined in {@link MemberState}.</li>
+ *
+ * Reconciliation process:<p/>
+ * The member accepts all assignments received from the broker, resolves topic 
names from
+ * metadata, reconciles the resolved assignments, and keeps the unresolved to 
be reconciled when
+ * discovered with a metadata update. Reconciliations of resolved assignments 
are executed
+ * sequentially and acknowledged to the server as they complete. The 
reconciliation process
+ * involves multiple async operations, so the member will continue to 
heartbeat while these
+ * operations complete, to make sure that the member stays in the group while 
reconciling.
+ *
  * <p/>
- * Member info and state are updated based on the heartbeat responses the 
member receives.
+ *
+ * Reconciliation steps:
+ * <ol>
+ *     <li>Resolve topic names for all topic IDs received in the target 
assignment. Topic names
+ *     found in metadata are then ready to be reconciled. Topic IDs not found 
are kept as
+ *     unresolved, and the member request metadata updates until it resolves 
them (or the broker
+ *     removes it from the target assignment.</li>
+ *     <li>Commit offsets if auto-commit is enabled.</li>
+ *     <li>Invoke the user-defined onPartitionsRevoked listener.</li>
+ *     <li>Invoke the user-defined onPartitionsAssigned listener.</li>
+ *     <li>When the above steps complete, the member acknowledges the 
reconciled assignment,
+ *     which is the subset of the target that was resolved from metadata and 
actually reconciled.
+ *     The ack is performed by sending a heartbeat request back to the broker, 
including the
+ *     reconciled assignment.
+ *     .</li>

Review Comment:
   nit: I suppose that this one should go on the previous line.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();

Review Comment:
   nit: I may be worth logging something here as well to be consistent with 
`transitionToFatal`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -168,10 +319,37 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
         this.memberId = response.memberId();
         this.memberEpoch = response.memberEpoch();
         ConsumerGroupHeartbeatResponseData.Assignment assignment = 
response.assignment();
+
         if (assignment != null) {
-            setTargetAssignment(assignment);
+            transitionTo(MemberState.RECONCILING);
+            replaceUnresolvedAssignmentWithNewAssignment(assignment);
+            resolveMetadataForUnresolvedAssignment();
+            // TODO: improve reconciliation triggering. Initial approach of 
triggering on every
+            //  HB response and metadata update.

Review Comment:
   I would remove the TODOs for which we have Jiras.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+        });
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void onSubscriptionUpdated() {
+        if (state == MemberState.UNSUBSCRIBED) {
+            transitionToJoining();
+        }
+        // TODO: If the member is already part of the group, this should only 
ensure that the
+        //  updated subscription is included in the next heartbeat request.
     }
 
+    /**
+     * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+     * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+     * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+     * Visible for testing.
+     */
+    void transitionToJoining() {
+        if (state == MemberState.FATAL) {
+            log.warn("No action taken to join the group with the updated 
subscription because " +
+                    "the member is in FATAL state");
+            return;
+        }
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        registerForMetadataUpdates();
+    }
+
+    /**
+     * Register to get notified when the cluster metadata is updated, via the
+     * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+     */
+    private void registerForMetadataUpdates() {
+        if (!isRegisteredForMetadataUpdates) {
+            this.metadata.addClusterUpdateListener(this);
+            isRegisteredForMetadataUpdates = true;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public CompletableFuture<Void> leaveGroup() {
+        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+            // Member is not part of the group. No-op and return completed 
future to avoid
+            // unnecessary transitions.
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+            // Member already leaving. No-op and return existing leave group 
future that will
+            // complete when the ongoing leave operation completes.
+            return leaveGroupInProgress.get();
+        }
+
+        transitionTo(MemberState.PREPARE_LEAVING);
+        leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return future to indicate that the leave group is done when the 
callbacks
+        // complete, and the heartbeat to be sent out. (Best effort to send 
it, without waiting
+        // for a response or handling timeouts)
+        return leaveGroupInProgress.get();
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());
         }
-        return state.equals(MemberState.STABLE);
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
     }
 
     /**
-     * Take new target assignment received from the server and set it as 
targetAssignment to be
-     * processed. Following the consumer group protocol, the server won't send 
a new target
-     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
-     * if a target assignment already exists.
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionTo(MemberState.UNSUBSCRIBED);
+            leaveGroupInProgress.get().complete(null);
+            leaveGroupInProgress = Optional.empty();
+        }
+    }
+
+    /**
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(COMPARATOR);
+        assignedPartitions.addAll(assignmentReadyToReconcile);
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        boolean sameAssignmentReceived = 
assignedPartitions.equals(ownedPartitions);
+
+        if (reconciliationInProgress || sameAssignmentReceived) {
+            String reason;
+            if (reconciliationInProgress) {
+                reason = "Another reconciliation is already in progress. 
Assignment " +
+                        assignmentReadyToReconcile + " will be handled in the 
next reconciliation loop.";
+            } else {
+                reason = "Target assignment ready to reconcile is equals to 
the member current assignment.";
+            }
+            log.debug("Ignoring reconciliation attempt. " + reason);
+            return false;
+        }
+
+        markReconciliationInProgress();
+
+        // Partitions to assign (not previously owned)
+        SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR);
+        addedPartitions.addAll(assignedPartitions);
+        addedPartitions.removeAll(ownedPartitions);
+
+        // Partitions to revoke
+        SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+        revokedPartitions.addAll(ownedPartitions);
+        revokedPartitions.removeAll(assignedPartitions);
+
+        log.info("Updating assignment with\n" +
+                        "\tAssigned partitions:                       {}\n" +
+                        "\tCurrent owned partitions:                  {}\n" +
+                        "\tAdded partitions (assigned - owned):       {}\n" +
+                        "\tRevoked partitions (owned - assigned):     {}\n",
+                assignedPartitions,
+                ownedPartitions,
+                addedPartitions,
+                revokedPartitions
+        );
+
+        CompletableFuture<Void> revocationResult;
+        if (!revokedPartitions.isEmpty()) {
+            revocationResult = revokePartitions(revokedPartitions);
+        } else {
+            revocationResult = CompletableFuture.completedFuture(null);
+            // Reschedule the auto commit starting from now (new assignment 
received without any
+            // revocation).
+            commitRequestManager.resetAutoCommitTimer();
+        }
+
+        // Future that will complete when the full reconciliation process 
completes (revocation
+        // and assignment, executed sequentially)
+        CompletableFuture<Void> reconciliationResult =
+                revocationResult.thenCompose(__ -> {
+                    boolean memberHasRejoined = 
!Objects.equals(memberEpochOnReconciliationStart,
+                            memberEpoch);

Review Comment:
   nit: We could use `==` here now.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+        });
+        subscriptions.assignFromSubscribed(Collections.emptySet());

Review Comment:
   I wonder why we clear it here whereas in transitionToFenced we clear it when 
the callback future is completed. Is there a reason for this subtile difference?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -17,253 +17,1059 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
-import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class MembershipManagerImplTest {
 
     private static final String GROUP_ID = "test-group";
     private static final String MEMBER_ID = "test-member-1";
     private static final int MEMBER_EPOCH = 1;
-    private final LogContext logContext = new LogContext();
+
+    private SubscriptionState subscriptionState;
+    private ConsumerMetadata metadata;
+
+    private CommitRequestManager commitRequestManager;
+
+    private ConsumerTestBuilder testBuilder;
+
+    @BeforeEach
+    public void setup() {
+        testBuilder = new 
ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation());
+        metadata = testBuilder.metadata;
+        subscriptionState = testBuilder.subscriptions;
+        commitRequestManager = testBuilder.commitRequestManager.get();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (testBuilder != null) {
+            testBuilder.close();
+        }
+    }
+
+    private MembershipManagerImpl createMembershipManagerJoiningGroup() {
+        MembershipManagerImpl manager = spy(new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager,
+                metadata, testBuilder.logContext));
+        manager.transitionToJoining();
+        return manager;
+    }
+
+    private MembershipManagerImpl createMembershipManagerJoiningGroup(String 
groupInstanceId,
+                                                                      String 
serverAssignor) {
+        MembershipManagerImpl manager = new MembershipManagerImpl(
+                GROUP_ID, Optional.ofNullable(groupInstanceId), 
Optional.ofNullable(serverAssignor),
+                subscriptionState, commitRequestManager, metadata, 
testBuilder.logContext);
+        manager.transitionToJoining();
+        return manager;
+    }
 
     @Test
     public void testMembershipManagerServerAssignor() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         assertEquals(Optional.empty(), membershipManager.serverAssignor());
 
-        membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", 
"Uniform", logContext);
+        membershipManager = createMembershipManagerJoiningGroup("instance1", 
"Uniform");
         assertEquals(Optional.of("Uniform"), 
membershipManager.serverAssignor());
     }
 
     @Test
     public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
-        new MembershipManagerImpl(GROUP_ID, logContext);
-        new MembershipManagerImpl(GROUP_ID, null, null, logContext);
+        createMembershipManagerJoiningGroup();
+        createMembershipManagerJoiningGroup(null, null);
+    }
+
+    @Test
+    public void 
testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() {
+        // First join should register to get metadata updates
+        MembershipManagerImpl manager = new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager,
+                metadata, testBuilder.logContext);
+        manager.transitionToJoining();
+        verify(metadata).addClusterUpdateListener(manager);
+        clearInvocations(metadata);
+
+        // Following joins should not register again.
+        receiveEmptyAssignment(manager);
+        mockLeaveGroup();
+        manager.leaveGroup();
+        assertEquals(MemberState.LEAVING, manager.state());
+        manager.onHeartbeatRequestSent();
+        assertEquals(MemberState.UNSUBSCRIBED, manager.state());
+        manager.transitionToJoining();
+        verify(metadata, never()).addClusterUpdateListener(manager);
+    }
+
+    @Test
+    public void testReconcilingWhenReceivingAssignmentFoundInMetadata() {
+        MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(true);
+        assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+
+        // When the ack is sent the member should go back to STABLE
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
     }
 
     @Test
     public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        assertEquals(MemberState.UNJOINED, membershipManager.state());
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        assertEquals(MemberState.JOINING, membershipManager.state());
 
         ConsumerGroupHeartbeatResponse responseWithoutAssignment =
                 createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(responseWithoutAssignment.data());
+        
membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data());
         assertNotEquals(MemberState.RECONCILING, membershipManager.state());
 
         ConsumerGroupHeartbeatResponse responseWithAssignment =
-                createConsumerGroupHeartbeatResponse(createAssignment());
-        membershipManager.updateState(responseWithAssignment.data());
+                createConsumerGroupHeartbeatResponse(createAssignment(true));
+        
membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data());
         assertEquals(MemberState.RECONCILING, membershipManager.state());
     }
 
     @Test
     public void testMemberIdAndEpochResetOnFencedMembers() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponse heartbeatResponse =
-                createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
+        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
 
+        mockMemberHasAutoAssignedPartition();
+
         membershipManager.transitionToFenced();
-        assertFalse(membershipManager.memberId().isEmpty());
+        assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(0, membershipManager.memberEpoch());
     }
 
     @Test
-    public void testTransitionToFailure() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+    public void testTransitionToFatal() {
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
 
-        membershipManager.transitionToFailed();
-        assertEquals(MemberState.FAILED, membershipManager.state());
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        assertEquals(MemberState.FATAL, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testTransitionToFailedWhenTryingToJoin() {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager, metadata,
+                testBuilder.logContext);
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        membershipManager.transitionToJoining();
+
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        assertEquals(MemberState.FATAL, membershipManager.state());
     }
 
     @Test
     public void testFencingWhenStateIsStable() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        MembershipManager membershipManager = createMemberInStableState();
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testFencingWhenStateIsReconciling() {
+        MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(false);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling and transition out 
of the RECONCILING
+     * state (due to failure). When the reconciliation completes it should not 
be applied because
+     * it is not relevant anymore (it should not update the assignment on the 
member or send ack).
+     */
+    @Test
+    public void 
testDelayedReconciliationResultDiscardedIfMemberNotInReconcilingStateAnymore() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        Set<TopicPartition> owned = Collections.singleton(new 
TopicPartition(topic1, 0));
+        mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true);
+
+        // Reconciliation that does not complete stuck on revocation commit.
+        CompletableFuture<Void> commitResult = 
mockEmptyAssignmentAndRevocationStuckOnCommit(membershipManager);
+
+        // Member received fatal error while reconciling
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        clearInvocations(subscriptionState);
+
+        // Complete commit request
+        commitResult.complete(null);
+
+        // Member should not update the subscription or send ack when the 
delayed reconciliation
+        // completed.
+        verify(subscriptionState, never()).assignFromSubscribed(anySet());
+        assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling an assignment A 
(waiting on
+     * metadata, commit or callbacks), and it rejoins (due to fence or 
unsubscribe/subscribe). If
+     * the reconciliation of A completes it should not be applied (it should 
not update the
+     * assignment on the member or send ack).
+     */
+    @Test
+    public void testDelayedReconciliationResultDiscardedIfMemberRejoins() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        Set<TopicPartition> owned = Collections.singleton(new 
TopicPartition(topic1, 0));
+        mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true);
+
+        // Reconciliation that does not complete stuck on revocation commit.
+        CompletableFuture<Void> commitResult =
+                mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, 
topicId1, topic1,
+                        Arrays.asList(1, 2), true);
+        Set<TopicPartition> assignment1 = new HashSet<>();
+        assignment1.add(new TopicPartition(topic1, 1));
+        assignment1.add(new TopicPartition(topic1, 2));
+        assertEquals(assignment1, 
membershipManager.assignmentReadyToReconcile());
+        int currentEpoch = membershipManager.memberEpoch();
+
+        // Get fenced and rejoin while still reconciling. Get new assignment 
to reconcile after
+        // rejoining.
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        clearInvocations(subscriptionState);
+
+        // Get new assignment A2 after rejoining. This should not trigger a 
reconciliation just
+        // yet because there is another on in progress, but should keep the 
new assignment ready
+        // to be reconciled next.
+        Uuid topicId3 = Uuid.randomUuid();
+        mockOwnedPartitionAndAssignmentReceived(topicId3, "topic3", owned, 
true);
+        receiveAssignmentAfterRejoin(topicId3, Collections.singletonList(5), 
membershipManager);
+        verifyReconciliationNotTriggered(membershipManager);
+        Set<TopicPartition> assignmentAfterRejoin = Collections.singleton(new 
TopicPartition("topic3", 5));
+        assertEquals(assignmentAfterRejoin, 
membershipManager.assignmentReadyToReconcile());
+
+        // Reconciliation completes when the member has already re-joined the 
group. Should not
+        // update the subscription state or send ack.
+        commitResult.complete(null);
+        verify(subscriptionState, 
never()).assignFromSubscribed(anyCollection());
+        assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+
+        // Assignment received after rejoining should be ready to reconcile on 
the next
+        // reconciliation loop.
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(assignmentAfterRejoin, 
membershipManager.assignmentReadyToReconcile());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling an assignment A 
(waiting on
+     * metadata, commit or callbacks), and the target assignment changes (due 
to new topics added
+     * to metadata, or new assignment received from broker). If the 
reconciliation of A completes
+     * t should be applied (should update the assignment on the member and 
send ack), and then
+     * the reconciliation of assignment B will be processed and applied in the 
next
+     * reconciliation loop.
+     */
+    @Test
+    public void 
testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataUpdate() {
+        // Member receives and reconciles topic1-partition0
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        MembershipManagerImpl membershipManager =
+                mockMemberSuccessfullyReceivesAndAcksAssignment(topicId1, 
topic1, Arrays.asList(0));
+        membershipManager.onHeartbeatRequestSent();
         assertEquals(MemberState.STABLE, membershipManager.state());
+        clearInvocations(membershipManager, subscriptionState);
+        
when(subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(new
 TopicPartition(topic1, 0)));
+
+        // New assignment revoking the partitions owned and adding a new one 
(not in metadata).
+        // Reconciliation triggered for topic 1 (stuck on revocation commit) 
and topic2 waiting
+        // for metadata.
+        Uuid topicId2 = Uuid.randomUuid();
+        String topic2 = "topic2";
+        CompletableFuture<Void> commitResult =
+                mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, 
topicId2, topic2,
+                        Arrays.asList(1, 2), false);
+        verify(metadata).requestUpdate(anyBoolean());
+        assertEquals(Collections.singleton(topicId2), 
membershipManager.topicsWaitingForMetadata());
 
-        testStateUpdateOnFenceError(membershipManager);
+        // Metadata discovered for topic2 while reconciliation in progress to 
revoke topic1.
+        // Should not trigger a new reconciliation because there is one 
already in progress.
+        mockTopicNameInMetadataCache(Collections.singletonMap(topicId2, 
topic2), true);
+        membershipManager.onUpdate(null);
+        assertEquals(Collections.emptySet(), 
membershipManager.topicsWaitingForMetadata());
+        verifyReconciliationNotTriggered(membershipManager);
+
+        // Reconciliation in progress completes. Should be applied revoking 
topic 1 only. Newly
+        // discovered topic2 will be reconciled in the next reconciliation 
loop.
+        commitResult.complete(null);
+
+        // Member should update the subscription and send ack when the delayed 
reconciliation
+        // completes.
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+
+        // Pending assignment that was discovered in metadata should be ready 
to reconcile in the
+        // next reconciliation loop.
+        Set<TopicPartition> topic2Assignment = new HashSet<>(Arrays.asList(
+                new TopicPartition(topic2, 1),
+                new TopicPartition(topic2, 2)));
+        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
+        assertEquals(topic2Assignment, 
membershipManager.assignmentReadyToReconcile());
     }
 
     @Test
-    public void testFencingWhenStateIsReconciling() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(createAssignment());
-        membershipManager.updateState(heartbeatResponse.data());
+    public void testLeaveGroupWhenStateIsStable() {
+        MembershipManager membershipManager = createMemberInStableState();
+        
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testLeaveGroupWhenStateIsReconciling() {
+        MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(false);
         assertEquals(MemberState.RECONCILING, membershipManager.state());
 
-        testStateUpdateOnFenceError(membershipManager);
+        
testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
+    }
+
+    @Test
+    public void testLeaveGroupWhenMemberAlreadyLeaving() {
+        MembershipManager membershipManager = createMemberInStableState();
+
+        // First leave attempt. Should trigger the callbacks and stay LEAVING 
until
+        // callbacks complete and the heartbeat is sent out.
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+        assertFalse(leaveResult1.isDone());
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        clearInvocations(subscriptionState);
+
+        // Second leave attempt while the first one has not completed yet. 
Should not
+        // trigger any callbacks, and return a future that will complete when 
the ongoing first
+        // leave operation completes.
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup();
+        verify(subscriptionState, never()).rebalanceListener();
+        assertFalse(leaveResult2.isDone());
+
+        // Complete first leave group operation. Should also complete the 
second leave group.
+        membershipManager.onHeartbeatRequestSent();
+        assertTrue(leaveResult1.isDone());
+        assertFalse(leaveResult1.isCompletedExceptionally());
+        assertTrue(leaveResult2.isDone());
+        assertFalse(leaveResult2.isCompletedExceptionally());
+
+        // Subscription should have been updated only once with the first 
leave group.
+        verify(subscriptionState, 
never()).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testLeaveGroupWhenMemberAlreadyLeft() {
+        MembershipManager membershipManager = createMemberInStableState();
+
+        // Leave group triggered and completed
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult1 = membershipManager.leaveGroup();
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        assertTrue(leaveResult1.isDone());
+        assertFalse(leaveResult1.isCompletedExceptionally());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        clearInvocations(subscriptionState);
+
+        // Call to leave group again, when member already left. Should be 
no-op (no callbacks,
+        // no assignment updated)
+        mockLeaveGroup();
+        CompletableFuture<Void> leaveResult2 = membershipManager.leaveGroup();
+        assertTrue(leaveResult2.isDone());
+        assertFalse(leaveResult2.isCompletedExceptionally());
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        verify(subscriptionState, never()).rebalanceListener();
+        verify(subscriptionState, 
never()).assignFromSubscribed(Collections.emptySet());
     }
 
     @Test
     public void testFatalFailureWhenStateIsUnjoined() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        assertEquals(MemberState.UNJOINED, membershipManager.state());
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        assertEquals(MemberState.JOINING, membershipManager.state());
 
         testStateUpdateOnFatalFailure(membershipManager);
     }
 
     @Test
     public void testFatalFailureWhenStateIsStable() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
 
         testStateUpdateOnFatalFailure(membershipManager);
     }
 
-    @Test
-    public void testFencingShouldNotHappenWhenStateIsUnjoined() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        assertEquals(MemberState.UNJOINED, membershipManager.state());
-
-        // Getting fenced when the member is not part of the group is not 
expected and should
-        // fail with invalid transition.
-        assertThrows(IllegalStateException.class, 
membershipManager::transitionToFenced);
-    }
-
     @Test
     public void testUpdateStateFailsOnResponsesWithErrors() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         // Updating state with a heartbeat response containing errors cannot 
be performed and
         // should fail.
         ConsumerGroupHeartbeatResponse unknownMemberResponse =
                 
createConsumerGroupHeartbeatResponseWithError(Errors.UNKNOWN_MEMBER_ID);
         assertThrows(IllegalArgumentException.class,
-                () -> 
membershipManager.updateState(unknownMemberResponse.data()));
+                () -> 
membershipManager.onHeartbeatResponseReceived(unknownMemberResponse.data()));
     }
 
+    /**
+     * This test should be the case when an assignment is sent to the member, 
and it cannot find
+     * it in metadata (permanently, ex. topic deleted). The member will keep 
the assignment as
+     * waiting for metadata, but the following assignment received from the 
broker will not
+     * contain the deleted topic. The member will discard the assignment that 
was pending and
+     * proceed with the reconciliation of the new assignment.
+     */
     @Test
-    public void testAssignmentUpdatedAsReceivedAndProcessed() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponseData.Assignment newAssignment = 
createAssignment();
-        ConsumerGroupHeartbeatResponse heartbeatResponse =
-                createConsumerGroupHeartbeatResponse(newAssignment);
-        membershipManager.updateState(heartbeatResponse.data());
+    public void testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
+        MembershipManagerImpl membershipManager = 
mockJoinAndReceiveAssignment(false);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        // When the ack is sent the member should go back to RECONCILING
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertTrue(membershipManager.topicsWaitingForMetadata().size() > 0);
+
+        // New target assignment received while there is another one waiting 
to be resolved
+        // and reconciled. This assignment does not include the previous one 
that is waiting
+        // for metadata, so the member will discard the topics that were 
waiting for metadata, and
+        // reconcile the new assignment.
+        Uuid topicId = Uuid.randomUuid();
+        String topicName = "topic1";
+        
when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, 
topicName));
+        receiveAssignment(topicId, Collections.singletonList(0), 
membershipManager);
+        Set<TopicPartition> expectedAssignment = Collections.singleton(new 
TopicPartition(topicName, 0));
+        assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(expectedAssignment);
+
+        // When ack for the reconciled assignment is sent, member should go 
back to STABLE
+        // because the first assignment that was not resolved should have been 
discarded
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
+        assertTrue(membershipManager.topicsWaitingForMetadata().isEmpty());
+    }
+
+    /**
+     * This test should be the case when an assignment is sent to the member, 
and it cannot find
+     * it in metadata (temporarily). The broker will continue to send the 
assignment to the
+     * member. The member should keep it was waiting for metadata and continue 
to request updates.

Review Comment:
   `The broker will continue to send the assignment to the member.` This is not 
entirely true. The broker may not send anything.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+        });
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);

Review Comment:
   Is there a reason why we don't do this as the first thing in this method? 
This would be more consistent with transitionToFenced.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+        });
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void onSubscriptionUpdated() {
+        if (state == MemberState.UNSUBSCRIBED) {
+            transitionToJoining();
+        }
+        // TODO: If the member is already part of the group, this should only 
ensure that the
+        //  updated subscription is included in the next heartbeat request.
     }
 
+    /**
+     * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+     * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+     * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+     * Visible for testing.
+     */
+    void transitionToJoining() {
+        if (state == MemberState.FATAL) {
+            log.warn("No action taken to join the group with the updated 
subscription because " +
+                    "the member is in FATAL state");
+            return;
+        }
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        registerForMetadataUpdates();
+    }
+
+    /**
+     * Register to get notified when the cluster metadata is updated, via the
+     * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+     */
+    private void registerForMetadataUpdates() {
+        if (!isRegisteredForMetadataUpdates) {
+            this.metadata.addClusterUpdateListener(this);
+            isRegisteredForMetadataUpdates = true;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public CompletableFuture<Void> leaveGroup() {
+        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+            // Member is not part of the group. No-op and return completed 
future to avoid
+            // unnecessary transitions.
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+            // Member already leaving. No-op and return existing leave group 
future that will
+            // complete when the ongoing leave operation completes.
+            return leaveGroupInProgress.get();
+        }
+
+        transitionTo(MemberState.PREPARE_LEAVING);
+        leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return future to indicate that the leave group is done when the 
callbacks
+        // complete, and the heartbeat to be sent out. (Best effort to send 
it, without waiting
+        // for a response or handling timeouts)
+        return leaveGroupInProgress.get();
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());

Review Comment:
   I am a bit confuse by where we call 
`clearPendingAssignmentsAndLocalNamesCache`. Sometime we call it when the 
callback future is completed, sometime right after scheduling the callback. I 
wonder if it would be possible to be more consistent or if there are specific 
reasons that I did not get.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);

Review Comment:
   nit: Should we also log the member epoch?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;

Review Comment:
   Is this still valid? It looks like we call onPartitionsLost explicitly now.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+        });
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void onSubscriptionUpdated() {
+        if (state == MemberState.UNSUBSCRIBED) {
+            transitionToJoining();
+        }
+        // TODO: If the member is already part of the group, this should only 
ensure that the
+        //  updated subscription is included in the next heartbeat request.
     }
 
+    /**
+     * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+     * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+     * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+     * Visible for testing.
+     */
+    void transitionToJoining() {
+        if (state == MemberState.FATAL) {
+            log.warn("No action taken to join the group with the updated 
subscription because " +
+                    "the member is in FATAL state");
+            return;
+        }
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        registerForMetadataUpdates();
+    }
+
+    /**
+     * Register to get notified when the cluster metadata is updated, via the
+     * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+     */
+    private void registerForMetadataUpdates() {
+        if (!isRegisteredForMetadataUpdates) {
+            this.metadata.addClusterUpdateListener(this);
+            isRegisteredForMetadataUpdates = true;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public CompletableFuture<Void> leaveGroup() {
+        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+            // Member is not part of the group. No-op and return completed 
future to avoid
+            // unnecessary transitions.
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+            // Member already leaving. No-op and return existing leave group 
future that will
+            // complete when the ongoing leave operation completes.
+            return leaveGroupInProgress.get();
+        }
+
+        transitionTo(MemberState.PREPARE_LEAVING);
+        leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();

Review Comment:
   Note that here we clear the `assignedTopicNamesCache` but we only clear the 
assigned partitions after the callback is executed. This means that we have a 
period of time during which we are not able to resolve ids from the names in 
the subscription. I suppose that it does not matter in this case but this could 
be a source of subtile bugs.
   
   As we discussed offline, I think that we really need to update the 
subscriptions to use TopicIdPartitions. If this is not possible, an 
intermediate approach would be to keep the assigned TopicIdPartitions in this 
manager and to update the subscriptions with 
`subscriptions.assignFromSubscribed` when we update it. Or, we could also move 
the bookkeeping of the cache closer to calls to 
`subscriptions.assignFromSubscribed`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+        });
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void onSubscriptionUpdated() {
+        if (state == MemberState.UNSUBSCRIBED) {
+            transitionToJoining();
+        }
+        // TODO: If the member is already part of the group, this should only 
ensure that the
+        //  updated subscription is included in the next heartbeat request.
     }
 
+    /**
+     * Transition to the {@link MemberState#JOINING} state, indicating that 
the member will
+     * try to join the group on the next heartbeat request. This is expected 
to be invoked when
+     * the user calls the subscribe API, or when the member wants to rejoin 
after getting fenced.
+     * Visible for testing.
+     */
+    void transitionToJoining() {
+        if (state == MemberState.FATAL) {
+            log.warn("No action taken to join the group with the updated 
subscription because " +
+                    "the member is in FATAL state");
+            return;
+        }
+        resetEpoch();
+        transitionTo(MemberState.JOINING);
+        clearPendingAssignmentsAndLocalNamesCache();
+        registerForMetadataUpdates();
+    }
+
+    /**
+     * Register to get notified when the cluster metadata is updated, via the
+     * {@link #onUpdate(ClusterResource)}. Register only if the manager is not 
register already.
+     */
+    private void registerForMetadataUpdates() {
+        if (!isRegisteredForMetadataUpdates) {
+            this.metadata.addClusterUpdateListener(this);
+            isRegisteredForMetadataUpdates = true;
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
     @Override
-    public boolean shouldSendHeartbeat() {
-        return state() != MemberState.FAILED;
+    public CompletableFuture<Void> leaveGroup() {
+        if (state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL) {
+            // Member is not part of the group. No-op and return completed 
future to avoid
+            // unnecessary transitions.
+            return CompletableFuture.completedFuture(null);
+        }
+
+        if (state == MemberState.PREPARE_LEAVING || state == 
MemberState.LEAVING) {
+            // Member already leaving. No-op and return existing leave group 
future that will
+            // complete when the ongoing leave operation completes.
+            return leaveGroupInProgress.get();
+        }
+
+        transitionTo(MemberState.PREPARE_LEAVING);
+        leaveGroupInProgress = Optional.of(new CompletableFuture<>());
+
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
+        callbackResult.whenComplete((result, error) -> {
+            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+
+            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
+            // group (even in the case where the member had no assignment to 
release or when the
+            // callback execution failed.)
+            transitionToSendingLeaveGroup();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
+
+        // Return future to indicate that the leave group is done when the 
callbacks
+        // complete, and the heartbeat to be sent out. (Best effort to send 
it, without waiting
+        // for a response or handling timeouts)
+        return leaveGroupInProgress.get();
     }
 
     /**
-     * Transition to {@link MemberState#STABLE} only if there are no target 
assignments left to
-     * reconcile. Transition to {@link MemberState#RECONCILING} otherwise.
+     * Release member assignment by calling the user defined callbacks for 
onPartitionsRevoked or
+     * onPartitionsLost.
+     * <ul>
+     *     <li>If the member is part of the group (epoch > 0), this will 
invoke onPartitionsRevoked.
+     *     This will be the case when releasing assignment because the member 
is intentionally
+     *     leaving the group (after a call to unsubscribe)</li>
+     *
+     *     <li>If the member is not part of the group (epoch <=0), this will 
invoke onPartitionsLost.
+     *     This will be the case when releasing assignment after being fenced 
.</li>
+     * </ul>
+     *
+     * @return Future that will complete when the callback execution completes.
      */
-    private boolean maybeTransitionToStable() {
-        if (!hasPendingTargetAssignment()) {
-            transitionTo(MemberState.STABLE);
+    private CompletableFuture<Void> 
invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(COMPARATOR);
+        droppedPartitions.addAll(subscriptions.assignedPartitions());
+
+        CompletableFuture<Void> callbackResult;
+        if (droppedPartitions.isEmpty()) {
+            // No assignment to release
+            callbackResult = CompletableFuture.completedFuture(null);
         } else {
-            transitionTo(MemberState.RECONCILING);
+            // Release assignment
+            if (memberEpoch > 0) {
+                // Member is part of the group. Invoke onPartitionsRevoked.
+                callbackResult = revokePartitions(droppedPartitions);
+            } else {
+                // Member is not part of the group anymore. Invoke 
onPartitionsLost.
+                callbackResult = 
invokeOnPartitionsLostCallback(droppedPartitions);
+            }
+            // Remove all topic IDs and names from local cache
+            callbackResult.whenComplete((result, error) -> 
clearPendingAssignmentsAndLocalNamesCache());
         }
-        return state.equals(MemberState.STABLE);
+        return callbackResult;
+    }
+
+    /**
+     * Reset member epoch to the value required for the leave the group 
heartbeat request, and
+     * transition to the {@link MemberState#LEAVING} state so that a heartbeat
+     * request is sent out with it.
+     */
+    private void transitionToSendingLeaveGroup() {
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        currentAssignment = new HashSet<>();
+        transitionTo(MemberState.LEAVING);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean shouldHeartbeatNow() {
+        MemberState state = state();
+        return state == MemberState.ACKNOWLEDGING || state == 
MemberState.LEAVING;
     }
 
     /**
-     * Take new target assignment received from the server and set it as 
targetAssignment to be
-     * processed. Following the consumer group protocol, the server won't send 
a new target
-     * member while a previous one hasn't been acknowledged by the member, so 
this will fail
-     * if a target assignment already exists.
+     * {@inheritDoc}
+     */
+    @Override
+    public void onHeartbeatRequestSent() {
+        MemberState state = state();
+        if (state == MemberState.ACKNOWLEDGING) {
+            if (allPendingAssignmentsReconciled()) {
+                transitionTo(MemberState.STABLE);
+            } else {
+                transitionTo(MemberState.RECONCILING);
+            }
+        } else if (state == MemberState.LEAVING) {
+            transitionTo(MemberState.UNSUBSCRIBED);
+            leaveGroupInProgress.get().complete(null);
+            leaveGroupInProgress = Optional.empty();
+        }
+    }
+
+    /**
+     * @return True if there are no assignments waiting to be resolved from 
metadata or reconciled.
+     */
+    private boolean allPendingAssignmentsReconciled() {
+        return assignmentUnresolved.isEmpty() && 
assignmentReadyToReconcile.isEmpty();
+    }
+
+    @Override
+    public boolean shouldSkipHeartbeat() {
+        MemberState state = state();
+        return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL;
+    }
+
+    /**
+     * Reconcile the assignment that has been received from the server and for 
which topic names
+     * are resolved, kept in the {@link #assignmentReadyToReconcile}. This 
will commit if needed,
+     * trigger the callbacks and update the subscription state. Note that only 
one reconciliation
+     * can be in progress at a time. If there is already another one in 
progress when this is
+     * triggered, it will be no-op, and the assignment will be reconciled on 
the next
+     * reconciliation loop.
+     */
+    boolean reconcile() {
+        // Make copy of the assignment to reconcile as it could change as new 
assignments or metadata updates are received
+        SortedSet<TopicPartition> assignedPartitions = new 
TreeSet<>(COMPARATOR);
+        assignedPartitions.addAll(assignmentReadyToReconcile);
+
+        SortedSet<TopicPartition> ownedPartitions = new TreeSet<>(COMPARATOR);
+        ownedPartitions.addAll(subscriptions.assignedPartitions());
+
+        boolean sameAssignmentReceived = 
assignedPartitions.equals(ownedPartitions);
+
+        if (reconciliationInProgress || sameAssignmentReceived) {
+            String reason;
+            if (reconciliationInProgress) {
+                reason = "Another reconciliation is already in progress. 
Assignment " +
+                        assignmentReadyToReconcile + " will be handled in the 
next reconciliation loop.";
+            } else {
+                reason = "Target assignment ready to reconcile is equals to 
the member current assignment.";
+            }
+            log.debug("Ignoring reconciliation attempt. " + reason);
+            return false;
+        }
+
+        markReconciliationInProgress();
+
+        // Partitions to assign (not previously owned)
+        SortedSet<TopicPartition> addedPartitions = new TreeSet<>(COMPARATOR);
+        addedPartitions.addAll(assignedPartitions);
+        addedPartitions.removeAll(ownedPartitions);
+
+        // Partitions to revoke
+        SortedSet<TopicPartition> revokedPartitions = new 
TreeSet<>(COMPARATOR);
+        revokedPartitions.addAll(ownedPartitions);
+        revokedPartitions.removeAll(assignedPartitions);
+
+        log.info("Updating assignment with\n" +
+                        "\tAssigned partitions:                       {}\n" +
+                        "\tCurrent owned partitions:                  {}\n" +
+                        "\tAdded partitions (assigned - owned):       {}\n" +
+                        "\tRevoked partitions (owned - assigned):     {}\n",
+                assignedPartitions,
+                ownedPartitions,
+                addedPartitions,
+                revokedPartitions
+        );
+
+        CompletableFuture<Void> revocationResult;
+        if (!revokedPartitions.isEmpty()) {
+            revocationResult = revokePartitions(revokedPartitions);
+        } else {
+            revocationResult = CompletableFuture.completedFuture(null);
+            // Reschedule the auto commit starting from now (new assignment 
received without any
+            // revocation).
+            commitRequestManager.resetAutoCommitTimer();
+        }
+
+        // Future that will complete when the full reconciliation process 
completes (revocation
+        // and assignment, executed sequentially)
+        CompletableFuture<Void> reconciliationResult =
+                revocationResult.thenCompose(__ -> {
+                    boolean memberHasRejoined = 
!Objects.equals(memberEpochOnReconciliationStart,
+                            memberEpoch);
+                    if (state == MemberState.RECONCILING && 
!memberHasRejoined) {
+                        // Apply assignment
+                        CompletableFuture<Void> assignResult = 
assignPartitions(assignedPartitions, addedPartitions);
+
+                        // Clear topic names cache only for topics that are 
not in the subscription anymore
+                        for (TopicPartition tp : revokedPartitions) {
+                            if 
(!subscriptions.subscription().contains(tp.topic())) {
+                                
assignedTopicNamesCache.values().remove(tp.topic());
+                            }
+                        }
+                        return assignResult;
+                    } else {
+                        String reason;
+                        if (state != MemberState.RECONCILING) {
+                            reason = "The member already transitioned out of 
the reconciling " +
+                                    "state into " + state;
+                        } else {
+                            reason = "The member has re-joined the group.";
+                        }
+                        // Revocation callback completed but the reconciled 
assignment should not
+                        // be applied (not relevant anymore). This could be 
because the member
+                        // is not in the RECONCILING state anymore (fenced, 
failed, unsubscribed),
+                        // or because it has already re-joined the group.
+                        CompletableFuture<Void> res = new 
CompletableFuture<>();
+                        res.completeExceptionally(new 
KafkaException("Interrupting reconciliation" +
+                                " after revocation. " + reason));
+                        return res;
+                    }
+                });
+
+        reconciliationResult.whenComplete((result, error) -> {
+            markReconciliationCompleted();
+            if (error != null) {
+                // Leaving member in RECONCILING state after callbacks fail. 
The member
+                // won't send the ack, and the expectation is that the broker 
will kick the
+                // member out of the group after the rebalance timeout 
expires, leading to a
+                // RECONCILING -> FENCED transition.
+                log.error("Reconciliation failed.", error);
+            } else {
+                if (state == MemberState.RECONCILING) {

Review Comment:
   Do we need to check the epoch here as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -181,63 +359,592 @@ public void 
updateState(ConsumerGroupHeartbeatResponseData response) {
     public void transitionToFenced() {
         resetEpoch();
         transitionTo(MemberState.FENCED);
+
+        // Release assignment
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member got fenced. Member will rejoin the group 
anyways.", error);
+            }
+            subscriptions.assignFromSubscribed(Collections.emptySet());
+            transitionToJoining();
+        });
+
+        clearPendingAssignmentsAndLocalNamesCache();
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void transitionToFailed() {
-        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FAILED);
-        transitionTo(MemberState.FAILED);
+    public void transitionToFatal() {
+        log.error("Member {} transitioned to {} state", memberId, 
MemberState.FATAL);
+
+        // Update epoch to indicate that the member is not in the group 
anymore, so that the
+        // onPartitionsLost is called to release assignment.
+        memberEpoch = ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        CompletableFuture<Void> callbackResult = 
invokeOnPartitionsLostCallback(subscriptions.assignedPartitions());
+        callbackResult.whenComplete((result, error) -> {
+            if (error != null) {
+                log.error("onPartitionsLost callback invocation failed while 
releasing assignment" +
+                        "after member failed with fatal error.", error);
+            }
+        });
+        subscriptions.assignFromSubscribed(Collections.emptySet());
+        clearPendingAssignmentsAndLocalNamesCache();
+        transitionTo(MemberState.FATAL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void onSubscriptionUpdated() {
+        if (state == MemberState.UNSUBSCRIBED) {
+            transitionToJoining();
+        }
+        // TODO: If the member is already part of the group, this should only 
ensure that the
+        //  updated subscription is included in the next heartbeat request.

Review Comment:
   I think that the next HB should pick it up. Otherwise, we could perhaps 
transition to Joining to force an immediate HB. I am not sure that it is worth 
it.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java:
##########
@@ -17,253 +17,1059 @@
 
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
-import org.apache.kafka.common.utils.LogContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anySet;
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class MembershipManagerImplTest {
 
     private static final String GROUP_ID = "test-group";
     private static final String MEMBER_ID = "test-member-1";
     private static final int MEMBER_EPOCH = 1;
-    private final LogContext logContext = new LogContext();
+
+    private SubscriptionState subscriptionState;
+    private ConsumerMetadata metadata;
+
+    private CommitRequestManager commitRequestManager;
+
+    private ConsumerTestBuilder testBuilder;
+
+    @BeforeEach
+    public void setup() {
+        testBuilder = new 
ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation());
+        metadata = testBuilder.metadata;
+        subscriptionState = testBuilder.subscriptions;
+        commitRequestManager = testBuilder.commitRequestManager.get();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (testBuilder != null) {
+            testBuilder.close();
+        }
+    }
+
+    private MembershipManagerImpl createMembershipManagerJoiningGroup() {
+        MembershipManagerImpl manager = spy(new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager,
+                metadata, testBuilder.logContext));
+        manager.transitionToJoining();
+        return manager;
+    }
+
+    private MembershipManagerImpl createMembershipManagerJoiningGroup(String 
groupInstanceId,
+                                                                      String 
serverAssignor) {
+        MembershipManagerImpl manager = new MembershipManagerImpl(
+                GROUP_ID, Optional.ofNullable(groupInstanceId), 
Optional.ofNullable(serverAssignor),
+                subscriptionState, commitRequestManager, metadata, 
testBuilder.logContext);
+        manager.transitionToJoining();
+        return manager;
+    }
 
     @Test
     public void testMembershipManagerServerAssignor() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         assertEquals(Optional.empty(), membershipManager.serverAssignor());
 
-        membershipManager = new MembershipManagerImpl(GROUP_ID, "instance1", 
"Uniform", logContext);
+        membershipManager = createMembershipManagerJoiningGroup("instance1", 
"Uniform");
         assertEquals(Optional.of("Uniform"), 
membershipManager.serverAssignor());
     }
 
     @Test
     public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
-        new MembershipManagerImpl(GROUP_ID, logContext);
-        new MembershipManagerImpl(GROUP_ID, null, null, logContext);
+        createMembershipManagerJoiningGroup();
+        createMembershipManagerJoiningGroup(null, null);
+    }
+
+    @Test
+    public void 
testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() {
+        // First join should register to get metadata updates
+        MembershipManagerImpl manager = new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager,
+                metadata, testBuilder.logContext);
+        manager.transitionToJoining();
+        verify(metadata).addClusterUpdateListener(manager);
+        clearInvocations(metadata);
+
+        // Following joins should not register again.
+        receiveEmptyAssignment(manager);
+        mockLeaveGroup();
+        manager.leaveGroup();
+        assertEquals(MemberState.LEAVING, manager.state());
+        manager.onHeartbeatRequestSent();
+        assertEquals(MemberState.UNSUBSCRIBED, manager.state());
+        manager.transitionToJoining();
+        verify(metadata, never()).addClusterUpdateListener(manager);
+    }
+
+    @Test
+    public void testReconcilingWhenReceivingAssignmentFoundInMetadata() {
+        MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(true);
+        assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+
+        // When the ack is sent the member should go back to STABLE
+        membershipManager.onHeartbeatRequestSent();
+        assertEquals(MemberState.STABLE, membershipManager.state());
     }
 
     @Test
     public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        assertEquals(MemberState.UNJOINED, membershipManager.state());
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        assertEquals(MemberState.JOINING, membershipManager.state());
 
         ConsumerGroupHeartbeatResponse responseWithoutAssignment =
                 createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(responseWithoutAssignment.data());
+        
membershipManager.onHeartbeatResponseReceived(responseWithoutAssignment.data());
         assertNotEquals(MemberState.RECONCILING, membershipManager.state());
 
         ConsumerGroupHeartbeatResponse responseWithAssignment =
-                createConsumerGroupHeartbeatResponse(createAssignment());
-        membershipManager.updateState(responseWithAssignment.data());
+                createConsumerGroupHeartbeatResponse(createAssignment(true));
+        
membershipManager.onHeartbeatResponseReceived(responseWithAssignment.data());
         assertEquals(MemberState.RECONCILING, membershipManager.state());
     }
 
     @Test
     public void testMemberIdAndEpochResetOnFencedMembers() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponse heartbeatResponse =
-                createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
+        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
+        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
 
+        mockMemberHasAutoAssignedPartition();
+
         membershipManager.transitionToFenced();
-        assertFalse(membershipManager.memberId().isEmpty());
+        assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(0, membershipManager.memberEpoch());
     }
 
     @Test
-    public void testTransitionToFailure() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
+    public void testTransitionToFatal() {
+        MembershipManagerImpl membershipManager = 
createMembershipManagerJoiningGroup();
         ConsumerGroupHeartbeatResponse heartbeatResponse =
                 createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        
membershipManager.onHeartbeatResponseReceived(heartbeatResponse.data());
         assertEquals(MemberState.STABLE, membershipManager.state());
         assertEquals(MEMBER_ID, membershipManager.memberId());
         assertEquals(MEMBER_EPOCH, membershipManager.memberEpoch());
 
-        membershipManager.transitionToFailed();
-        assertEquals(MemberState.FAILED, membershipManager.state());
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        assertEquals(MemberState.FATAL, membershipManager.state());
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testTransitionToFailedWhenTryingToJoin() {
+        MembershipManagerImpl membershipManager = new MembershipManagerImpl(
+                GROUP_ID, subscriptionState, commitRequestManager, metadata,
+                testBuilder.logContext);
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        membershipManager.transitionToJoining();
+
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        assertEquals(MemberState.FATAL, membershipManager.state());
     }
 
     @Test
     public void testFencingWhenStateIsStable() {
-        MembershipManagerImpl membershipManager = new 
MembershipManagerImpl(GROUP_ID, logContext);
-        ConsumerGroupHeartbeatResponse heartbeatResponse = 
createConsumerGroupHeartbeatResponse(null);
-        membershipManager.updateState(heartbeatResponse.data());
+        MembershipManager membershipManager = createMemberInStableState();
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    @Test
+    public void testFencingWhenStateIsReconciling() {
+        MembershipManager membershipManager = 
mockJoinAndReceiveAssignment(false);
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling and transition out 
of the RECONCILING
+     * state (due to failure). When the reconciliation completes it should not 
be applied because
+     * it is not relevant anymore (it should not update the assignment on the 
member or send ack).
+     */
+    @Test
+    public void 
testDelayedReconciliationResultDiscardedIfMemberNotInReconcilingStateAnymore() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        Set<TopicPartition> owned = Collections.singleton(new 
TopicPartition(topic1, 0));
+        mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true);
+
+        // Reconciliation that does not complete stuck on revocation commit.
+        CompletableFuture<Void> commitResult = 
mockEmptyAssignmentAndRevocationStuckOnCommit(membershipManager);
+
+        // Member received fatal error while reconciling
+        when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true);
+        membershipManager.transitionToFatal();
+        verify(subscriptionState).assignFromSubscribed(Collections.emptySet());
+        clearInvocations(subscriptionState);
+
+        // Complete commit request
+        commitResult.complete(null);
+
+        // Member should not update the subscription or send ack when the 
delayed reconciliation
+        // completed.
+        verify(subscriptionState, never()).assignFromSubscribed(anySet());
+        assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling an assignment A 
(waiting on
+     * metadata, commit or callbacks), and it rejoins (due to fence or 
unsubscribe/subscribe). If
+     * the reconciliation of A completes it should not be applied (it should 
not update the
+     * assignment on the member or send ack).
+     */
+    @Test
+    public void testDelayedReconciliationResultDiscardedIfMemberRejoins() {
+        MembershipManagerImpl membershipManager = createMemberInStableState();
+        Uuid topicId1 = Uuid.randomUuid();
+        String topic1 = "topic1";
+        Set<TopicPartition> owned = Collections.singleton(new 
TopicPartition(topic1, 0));
+        mockOwnedPartitionAndAssignmentReceived(topicId1, topic1, owned, true);
+
+        // Reconciliation that does not complete stuck on revocation commit.
+        CompletableFuture<Void> commitResult =
+                mockNewAssignmentAndRevocationStuckOnCommit(membershipManager, 
topicId1, topic1,
+                        Arrays.asList(1, 2), true);
+        Set<TopicPartition> assignment1 = new HashSet<>();
+        assignment1.add(new TopicPartition(topic1, 1));
+        assignment1.add(new TopicPartition(topic1, 2));
+        assertEquals(assignment1, 
membershipManager.assignmentReadyToReconcile());
+        int currentEpoch = membershipManager.memberEpoch();
+
+        // Get fenced and rejoin while still reconciling. Get new assignment 
to reconcile after
+        // rejoining.
+        
testFencedMemberReleasesAssignmentAndTransitionsToJoining(membershipManager);
+        clearInvocations(subscriptionState);
+
+        // Get new assignment A2 after rejoining. This should not trigger a 
reconciliation just
+        // yet because there is another on in progress, but should keep the 
new assignment ready
+        // to be reconciled next.
+        Uuid topicId3 = Uuid.randomUuid();
+        mockOwnedPartitionAndAssignmentReceived(topicId3, "topic3", owned, 
true);
+        receiveAssignmentAfterRejoin(topicId3, Collections.singletonList(5), 
membershipManager);
+        verifyReconciliationNotTriggered(membershipManager);
+        Set<TopicPartition> assignmentAfterRejoin = Collections.singleton(new 
TopicPartition("topic3", 5));
+        assertEquals(assignmentAfterRejoin, 
membershipManager.assignmentReadyToReconcile());
+
+        // Reconciliation completes when the member has already re-joined the 
group. Should not
+        // update the subscription state or send ack.
+        commitResult.complete(null);
+        verify(subscriptionState, 
never()).assignFromSubscribed(anyCollection());
+        assertNotEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
+
+        // Assignment received after rejoining should be ready to reconcile on 
the next
+        // reconciliation loop.
+        assertEquals(MemberState.RECONCILING, membershipManager.state());
+        assertEquals(assignmentAfterRejoin, 
membershipManager.assignmentReadyToReconcile());
+    }
+
+    /**
+     * This is the case where a member is stuck reconciling an assignment A 
(waiting on
+     * metadata, commit or callbacks), and the target assignment changes (due 
to new topics added
+     * to metadata, or new assignment received from broker). If the 
reconciliation of A completes
+     * t should be applied (should update the assignment on the member and 
send ack), and then

Review Comment:
   nit: `t` -> `it`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to