This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d0ec4d7  KAFKA-13214; Consumer should not reset state after retriable 
error in rebalance (#11231)
d0ec4d7 is described below

commit d0ec4d7ebf0022537807f5080b19414bd81f706e
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Aug 24 11:59:38 2021 -0700

    KAFKA-13214; Consumer should not reset state after retriable error in 
rebalance (#11231)
    
    Currently the consumer will reset state after any retriable error during a 
rebalance. This includes coordinator disconnects as well as coordinator 
changes. The impact of this is that rebalances get delayed since they will be 
blocked until the session timeout of the old memberId expires.
    
    The patch here fixes the problem by not resetting the member state after a 
retriable error.
    
    Reviewers: David Jacot <[email protected]>, Guozhang Wang 
<[email protected]>
---
 .../consumer/internals/AbstractCoordinator.java    |   2 +-
 .../internals/AbstractCoordinatorTest.java         | 134 +++++++++++++++++++++
 .../internals/ConsumerCoordinatorTest.java         |  20 ++-
 3 files changed, 149 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 223c4fe..2118861 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -467,6 +467,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 final RuntimeException exception = future.exception();
 
                 resetJoinGroupFuture();
+                rejoinNeeded = true;
 
                 if (exception instanceof UnknownMemberIdException ||
                     exception instanceof IllegalGenerationException ||
@@ -476,7 +477,6 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 else if (!future.isRetriable())
                     throw exception;
 
-                resetStateAndRejoin(String.format("rebalance failed with 
retriable error %s", exception));
                 timer.sleep(rebalanceConfig.retryBackoffMs);
             }
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index b97fb78..384ba91 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.message.JoinGroupRequestData;
 import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
+import org.apache.kafka.common.message.SyncGroupRequestData;
 import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
@@ -431,6 +432,139 @@ public class AbstractCoordinatorTest {
     }
 
     @Test
+    public void testRetainMemberIdAfterJoinGroupDisconnect() {
+        setupCoordinator();
+
+        String memberId = "memberId";
+        int generation = 5;
+
+        // Rebalance once to initialize the generation and memberId
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        expectJoinGroup("", generation, memberId);
+        expectSyncGroup(generation, memberId);
+        ensureActiveGroup(generation, memberId);
+
+        // Force a rebalance
+        coordinator.requestRejoin("Manual test trigger");
+        assertTrue(coordinator.rejoinNeededOrPending());
+
+        // Disconnect during the JoinGroup and ensure that the retry preserves 
the memberId
+        int rejoinedGeneration = 10;
+        expectDisconnectInJoinGroup(memberId);
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        expectJoinGroup(memberId, rejoinedGeneration, memberId);
+        expectSyncGroup(rejoinedGeneration, memberId);
+        ensureActiveGroup(rejoinedGeneration, memberId);
+    }
+
+    @Test
+    public void testRetainMemberIdAfterSyncGroupDisconnect() {
+        setupCoordinator();
+
+        String memberId = "memberId";
+        int generation = 5;
+
+        // Rebalance once to initialize the generation and memberId
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        expectJoinGroup("", generation, memberId);
+        expectSyncGroup(generation, memberId);
+        ensureActiveGroup(generation, memberId);
+
+        // Force a rebalance
+        coordinator.requestRejoin("Manual test trigger");
+        assertTrue(coordinator.rejoinNeededOrPending());
+
+        // Disconnect during the SyncGroup and ensure that the retry preserves 
the memberId
+        int rejoinedGeneration = 10;
+        expectJoinGroup(memberId, rejoinedGeneration, memberId);
+        expectDisconnectInSyncGroup(rejoinedGeneration, memberId);
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+
+        // Note that the consumer always starts from JoinGroup after a failed 
rebalance
+        expectJoinGroup(memberId, rejoinedGeneration, memberId);
+        expectSyncGroup(rejoinedGeneration, memberId);
+        ensureActiveGroup(rejoinedGeneration, memberId);
+    }
+
+    private void ensureActiveGroup(
+        int generation,
+        String memberId
+    ) {
+        coordinator.ensureActiveGroup();
+        assertEquals(generation, coordinator.generation().generationId);
+        assertEquals(memberId, coordinator.generation().memberId);
+        assertFalse(coordinator.rejoinNeededOrPending());
+    }
+
+    private void expectSyncGroup(
+        int expectedGeneration,
+        String expectedMemberId
+    ) {
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof SyncGroupRequest)) {
+                return false;
+            }
+            SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) 
body).data();
+            return syncGroupRequest.generationId() == expectedGeneration
+                && syncGroupRequest.memberId().equals(expectedMemberId)
+                && syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
+                && syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
+        }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, PROTOCOL_NAME));
+    }
+
+    private void expectDisconnectInSyncGroup(
+        int expectedGeneration,
+        String expectedMemberId
+    ) {
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof SyncGroupRequest)) {
+                return false;
+            }
+            SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) 
body).data();
+            return syncGroupRequest.generationId() == expectedGeneration
+                && syncGroupRequest.memberId().equals(expectedMemberId)
+                && syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
+                && syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
+        }, null, true);
+    }
+
+    private void expectDisconnectInJoinGroup(
+        String expectedMemberId
+    ) {
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof JoinGroupRequest)) {
+                return false;
+            }
+            JoinGroupRequestData joinGroupRequest = ((JoinGroupRequest) 
body).data();
+            return joinGroupRequest.memberId().equals(expectedMemberId)
+                && joinGroupRequest.protocolType().equals(PROTOCOL_TYPE);
+        }, null, true);
+    }
+
+    private void expectJoinGroup(
+        String expectedMemberId,
+        int responseGeneration,
+        String responseMemberId
+    ) {
+        JoinGroupResponse response = joinGroupFollowerResponse(
+            responseGeneration,
+            responseMemberId,
+            "leaderId",
+            Errors.NONE,
+            PROTOCOL_TYPE
+        );
+
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof JoinGroupRequest)) {
+                return false;
+            }
+            JoinGroupRequestData joinGroupRequest = ((JoinGroupRequest) 
body).data();
+            return joinGroupRequest.memberId().equals(expectedMemberId)
+                && joinGroupRequest.protocolType().equals(PROTOCOL_TYPE);
+        }, response);
+    }
+
+    @Test
     public void testNoGenerationWillNotTriggerProtocolNameCheck() {
         final String wrongProtocolName = "wrong-name";
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 89bd68e..902ea99 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -1037,7 +1037,14 @@ public abstract class ConsumerCoordinatorTest {
         coordinator.poll(time.timer(0));
         assertTrue(coordinator.rejoinNeededOrPending());
 
-        client.respond(joinGroupLeaderResponse(2, consumerId, 
initialSubscription, Errors.NONE));
+        client.respond(request -> {
+            if (!(request instanceof JoinGroupRequest)) {
+                return false;
+            } else {
+                JoinGroupRequest joinRequest = (JoinGroupRequest) request;
+                return consumerId.equals(joinRequest.data().memberId());
+            }
+        }, joinGroupLeaderResponse(2, consumerId, initialSubscription, 
Errors.NONE));
         client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
         coordinator.poll(time.timer(Long.MAX_VALUE));
 
@@ -1045,12 +1052,13 @@ public abstract class ConsumerCoordinatorTest {
         Collection<TopicPartition> revoked = getRevoked(partitions, 
partitions);
         assertEquals(revoked.isEmpty() ? 0 : 1, 
rebalanceListener.revokedCount);
         assertEquals(revoked.isEmpty() ? null : revoked, 
rebalanceListener.revoked);
-        Collection<TopicPartition> lost = getLost(partitions);
-        assertEquals(lost.isEmpty() ? 0 : 1, rebalanceListener.lostCount);
-        assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
+        // No partitions have been lost since the rebalance failure was not 
fatal
+        assertEquals(0, rebalanceListener.lostCount);
+        assertNull(rebalanceListener.lost);
+
+        Collection<TopicPartition> added = getAdded(partitions, partitions);
         assertEquals(2, rebalanceListener.assignedCount);
-        // Since onPartitionsLost is invoked when the JoinGroup failed, all 
owned partitions have to be re-added
-        assertEquals(toSet(partitions), rebalanceListener.assigned);
+        assertEquals(added.isEmpty() ? Collections.emptySet() : 
toSet(partitions), rebalanceListener.assigned);
         assertEquals(toSet(partitions), subscriptions.assignedPartitions());
     }
 

Reply via email to