This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c639b18e8ed MINOR: added retries if removeMembersFromConsumerGroup 
failed with UnknownMemberIdException (#21074)
c639b18e8ed is described below

commit c639b18e8ed37b7578747f9b66471edccf657179
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Dec 10 16:44:47 2025 -0800

    MINOR: added retries if removeMembersFromConsumerGroup failed with 
UnknownMemberIdException (#21074)
    
    While we are trying to delete members from a group, they may leave the
    group themselves. Which would lead to a failure of the tool, even though
    not having that member in the group is the desired state.
    
    Reviewers: Matthias J. Sax <[email protected]>, Kirk True
     <[email protected]>, Genseric Ghiro <[email protected]>
---
 .../org/apache/kafka/tools/StreamsResetter.java    |  62 +++++++----
 .../apache/kafka/tools/StreamsResetterTest.java    | 118 ++++++++++++++++++++-
 2 files changed, 156 insertions(+), 24 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java 
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index 08fc1d590e8..babeef3e45f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -27,10 +27,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Exit;
@@ -116,6 +118,8 @@ public class StreamsResetter {
             + "*** Warning! This tool makes irreversible changes to your 
application. It is strongly recommended that "
             + "you run this once with \"--dry-run\" to preview your changes 
before making them.\n\n";
 
+    private static final int MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES = 
3;
+
     private final List<String> allTopics = new LinkedList<>();
 
     public static void main(final String[] args) {
@@ -149,7 +153,7 @@ public class StreamsResetter {
             properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServerValue);
 
             try (Admin adminClient = Admin.create(properties)) {
-                maybeDeleteActiveConsumers(groupId, adminClient, options);
+                maybeDeleteActiveConsumers(groupId, adminClient, 
options.hasForce());
 
                 allTopics.clear();
                 allTopics.addAll(adminClient.listTopics().names().get(60, 
TimeUnit.SECONDS));
@@ -177,30 +181,42 @@ public class StreamsResetter {
         }
     }
 
-    private void maybeDeleteActiveConsumers(final String groupId,
-                                            final Admin adminClient,
-                                            final StreamsResetterOptions 
options)
+    // visible for testing
+    void maybeDeleteActiveConsumers(final String groupId,
+                                    final Admin adminClient,
+                                    final boolean force)
         throws ExecutionException, InterruptedException {
-        final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(
-            Set.of(groupId),
-            new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
-        try {
-            final List<MemberDescription> members =
-                new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
-            if (!members.isEmpty()) {
-                if (options.hasForce()) {
-                    System.out.println("Force deleting all active members in 
the group: " + groupId);
-                    adminClient.removeMembersFromConsumerGroup(groupId, new 
RemoveMembersFromConsumerGroupOptions()).all().get();
-                } else {
-                    throw new IllegalStateException("Consumer group '" + 
groupId + "' is still active "
-                        + "and has following members: " + members + ". "
-                        + "Make sure to stop all running application instances 
before running the reset tool."
-                        + " You can use option '--force' to remove active 
members from the group.");
+        int retries = 0;
+        while (true) {
+            final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(
+                    Set.of(groupId),
+                    new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
+            try {
+                final List<MemberDescription> members =
+                        new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
+                if (!members.isEmpty()) {
+                    if (force) {
+                        System.out.println("Force deleting all active members 
in the group: " + groupId);
+                        adminClient.removeMembersFromConsumerGroup(groupId, 
new RemoveMembersFromConsumerGroupOptions()).all().get();
+                    } else {
+                        throw new IllegalStateException("Consumer group '" + 
groupId + "' is still active "
+                                + "and has following members: " + members + ". 
"
+                                + "Make sure to stop all running application 
instances before running the reset tool."
+                                + " You can use option '--force' to remove 
active members from the group.");
+                    }
+                }
+                break;
+            } catch (ExecutionException ee) {
+                // If the group ID is not found, this is not an error case
+                if (ee.getCause() instanceof GroupIdNotFoundException) {
+                    break;
+                }
+                // if a member is unknown, it may mean that it left the group 
itself. Retrying to confirm.
+                if (ee.getCause() instanceof KafkaException ke && 
ke.getCause() instanceof UnknownMemberIdException) {
+                    if (retries++ < 
MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES) {
+                        continue;
+                    }
                 }
-            }
-        } catch (ExecutionException ee) {
-            // If the group ID is not found, this is not an error case
-            if (!(ee.getCause() instanceof GroupIdNotFoundException)) {
                 throw ee;
             }
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java 
b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java
index ef8875f7c49..020b7c1250f 100644
--- a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java
@@ -16,21 +16,33 @@
  */
 package org.apache.kafka.tools;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.MemberToRemove;
 import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.protocol.Errors;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import java.lang.reflect.Constructor;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
@@ -41,7 +53,15 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @Timeout(value = 600)
 public class StreamsResetterTest {
@@ -293,6 +313,102 @@ public class StreamsResetterTest {
         assertEquals(beginningAndEndOffset, position);
     }
 
+    @Test
+    public void shouldRetryToRemoveMembersOnUnknownMemberIdExceptionAndForce() 
throws Exception {
+        final String groupId = "groupId";
+
+        final Admin adminClient = mock(Admin.class);
+        final ConsumerGroupDescription consumerGroupDescription = 
mock(ConsumerGroupDescription.class);
+
+        when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+                .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, 
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+        when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
+                
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new
 LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of()))
+                
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()),
 Set.of()));
+        
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+        streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);
+
+        verify(adminClient, 
times(2)).removeMembersFromConsumerGroup(eq(groupId), any());
+    }
+
+    @Test
+    public void shouldFailAfterTooManyRetries() throws Exception {
+        final String groupId = "groupId";
+
+        final Admin adminClient = mock(Admin.class);
+        final ConsumerGroupDescription consumerGroupDescription = 
mock(ConsumerGroupDescription.class);
+
+        when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+                .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, 
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+        when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
+                
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new
 LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of()));
+        
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+        assertThrows(ExecutionException.class, () -> 
streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true));
+
+        verify(adminClient, 
times(4)).removeMembersFromConsumerGroup(eq(groupId), any());
+    }
+
+    @Test
+    public void shouldFailIfThereAreMembersAndNotForce() throws Exception {
+        final String groupId = "groupId";
+
+        final Admin adminClient = mock(Admin.class);
+        final ConsumerGroupDescription consumerGroupDescription = 
mock(ConsumerGroupDescription.class);
+
+        when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+                .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, 
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+        
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+        assertThrows(IllegalStateException.class, () -> 
streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, false));
+
+        verify(adminClient, 
never()).removeMembersFromConsumerGroup(eq(groupId), any());
+    }
+
+    @Test
+    public void shouldRemoveIfThereAreMembersAndForce() throws Exception {
+        final String groupId = "groupId";
+
+        final Admin adminClient = mock(Admin.class);
+        final ConsumerGroupDescription consumerGroupDescription = 
mock(ConsumerGroupDescription.class);
+
+        when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+                .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, 
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+        when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
+                
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()),
 Set.of()));
+        
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+        streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);
+
+        verify(adminClient).removeMembersFromConsumerGroup(eq(groupId), any());
+    }
+
+    @Test
+    public void shouldIgnoreGroupIdNotFoundException() throws Exception {
+        final String groupId = "groupId";
+
+        final Admin adminClient = mock(Admin.class);
+        final ConsumerGroupDescription consumerGroupDescription = 
mock(ConsumerGroupDescription.class);
+
+        final KafkaFutureImpl<ConsumerGroupDescription> future = new 
KafkaFutureImpl<>();
+        future.completeExceptionally(new GroupIdNotFoundException(groupId));
+        when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+                .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId, 
future)));
+        
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+        streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);
+
+        verify(adminClient, 
never()).removeMembersFromConsumerGroup(eq(groupId), any());
+    }
+
+    private RemoveMembersFromConsumerGroupResult 
createRemoveMembersFromConsumerGroupResult(final 
KafkaFuture<Map<LeaveGroupRequestData.MemberIdentity, Errors>> future,
+                                                                               
             final Set<MemberToRemove> memberInfos) throws Exception {
+        final Constructor<RemoveMembersFromConsumerGroupResult> constructor = 
RemoveMembersFromConsumerGroupResult.class.getDeclaredConstructor(KafkaFuture.class,
 Set.class);
+        constructor.setAccessible(true);
+        return constructor.newInstance(future, memberInfos);
+    }
+
     private Cluster createCluster(final int numNodes) {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         for (int i = 0; i < numNodes; ++i) {
@@ -315,4 +431,4 @@ public class StreamsResetterTest {
             return topicPartitionToOffsetAndTimestamp;
         }
     }
-}
\ No newline at end of file
+}

Reply via email to