This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c639b18e8ed MINOR: added retries if removeMembersFromConsumerGroup
failed with UnknownMemberIdException (#21074)
c639b18e8ed is described below
commit c639b18e8ed37b7578747f9b66471edccf657179
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Wed Dec 10 16:44:47 2025 -0800
MINOR: added retries if removeMembersFromConsumerGroup failed with
UnknownMemberIdException (#21074)
While we are trying to delete members from a group, they may leave the
group themselves. Which would lead to a failure of the tool, even though
not having that member in the group is the desired state.
Reviewers: Matthias J. Sax <[email protected]>, Kirk True
<[email protected]>, Genseric Ghiro <[email protected]>
---
.../org/apache/kafka/tools/StreamsResetter.java | 62 +++++++----
.../apache/kafka/tools/StreamsResetterTest.java | 118 ++++++++++++++++++++-
2 files changed, 156 insertions(+), 24 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
index 08fc1d590e8..babeef3e45f 100644
--- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java
@@ -27,10 +27,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Exit;
@@ -116,6 +118,8 @@ public class StreamsResetter {
+ "*** Warning! This tool makes irreversible changes to your
application. It is strongly recommended that "
+ "you run this once with \"--dry-run\" to preview your changes
before making them.\n\n";
+ private static final int MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES =
3;
+
private final List<String> allTopics = new LinkedList<>();
public static void main(final String[] args) {
@@ -149,7 +153,7 @@ public class StreamsResetter {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServerValue);
try (Admin adminClient = Admin.create(properties)) {
- maybeDeleteActiveConsumers(groupId, adminClient, options);
+ maybeDeleteActiveConsumers(groupId, adminClient,
options.hasForce());
allTopics.clear();
allTopics.addAll(adminClient.listTopics().names().get(60,
TimeUnit.SECONDS));
@@ -177,30 +181,42 @@ public class StreamsResetter {
}
}
- private void maybeDeleteActiveConsumers(final String groupId,
- final Admin adminClient,
- final StreamsResetterOptions
options)
+ // visible for testing
+ void maybeDeleteActiveConsumers(final String groupId,
+ final Admin adminClient,
+ final boolean force)
throws ExecutionException, InterruptedException {
- final DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(
- Set.of(groupId),
- new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
- try {
- final List<MemberDescription> members =
- new
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
- if (!members.isEmpty()) {
- if (options.hasForce()) {
- System.out.println("Force deleting all active members in
the group: " + groupId);
- adminClient.removeMembersFromConsumerGroup(groupId, new
RemoveMembersFromConsumerGroupOptions()).all().get();
- } else {
- throw new IllegalStateException("Consumer group '" +
groupId + "' is still active "
- + "and has following members: " + members + ". "
- + "Make sure to stop all running application instances
before running the reset tool."
- + " You can use option '--force' to remove active
members from the group.");
+ int retries = 0;
+ while (true) {
+ final DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(
+ Set.of(groupId),
+ new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000));
+ try {
+ final List<MemberDescription> members =
+ new
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
+ if (!members.isEmpty()) {
+ if (force) {
+ System.out.println("Force deleting all active members
in the group: " + groupId);
+ adminClient.removeMembersFromConsumerGroup(groupId,
new RemoveMembersFromConsumerGroupOptions()).all().get();
+ } else {
+ throw new IllegalStateException("Consumer group '" +
groupId + "' is still active "
+ + "and has following members: " + members + ".
"
+ + "Make sure to stop all running application
instances before running the reset tool."
+ + " You can use option '--force' to remove
active members from the group.");
+ }
+ }
+ break;
+ } catch (ExecutionException ee) {
+ // If the group ID is not found, this is not an error case
+ if (ee.getCause() instanceof GroupIdNotFoundException) {
+ break;
+ }
+ // if a member is unknown, it may mean that it left the group
itself. Retrying to confirm.
+ if (ee.getCause() instanceof KafkaException ke &&
ke.getCause() instanceof UnknownMemberIdException) {
+ if (retries++ <
MAX_REMOVE_MEMBERS_FROM_CONSUMER_GROUP_RETRIES) {
+ continue;
+ }
}
- }
- } catch (ExecutionException ee) {
- // If the group ID is not found, this is not an error case
- if (!(ee.getCause() instanceof GroupIdNotFoundException)) {
throw ee;
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java
b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java
index ef8875f7c49..020b7c1250f 100644
--- a/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/StreamsResetterTest.java
@@ -16,21 +16,33 @@
*/
package org.apache.kafka.tools;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ConsumerGroupDescription;
+import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.MemberToRemove;
import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.LeaveGroupRequestData;
+import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.lang.reflect.Constructor;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
@@ -41,7 +53,15 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@Timeout(value = 600)
public class StreamsResetterTest {
@@ -293,6 +313,102 @@ public class StreamsResetterTest {
assertEquals(beginningAndEndOffset, position);
}
+ @Test
+ public void shouldRetryToRemoveMembersOnUnknownMemberIdExceptionAndForce()
throws Exception {
+ final String groupId = "groupId";
+
+ final Admin adminClient = mock(Admin.class);
+ final ConsumerGroupDescription consumerGroupDescription =
mock(ConsumerGroupDescription.class);
+
+ when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+ .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId,
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+ when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
+
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new
LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of()))
+
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()),
Set.of()));
+
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+ streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);
+
+ verify(adminClient,
times(2)).removeMembersFromConsumerGroup(eq(groupId), any());
+ }
+
+ @Test
+ public void shouldFailAfterTooManyRetries() throws Exception {
+ final String groupId = "groupId";
+
+ final Admin adminClient = mock(Admin.class);
+ final ConsumerGroupDescription consumerGroupDescription =
mock(ConsumerGroupDescription.class);
+
+ when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+ .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId,
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+ when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
+
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of(new
LeaveGroupRequestData.MemberIdentity(), Errors.UNKNOWN_MEMBER_ID)), Set.of()));
+
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+ assertThrows(ExecutionException.class, () ->
streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true));
+
+ verify(adminClient,
times(4)).removeMembersFromConsumerGroup(eq(groupId), any());
+ }
+
+ @Test
+ public void shouldFailIfThereAreMembersAndNotForce() throws Exception {
+ final String groupId = "groupId";
+
+ final Admin adminClient = mock(Admin.class);
+ final ConsumerGroupDescription consumerGroupDescription =
mock(ConsumerGroupDescription.class);
+
+ when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+ .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId,
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+ assertThrows(IllegalStateException.class, () ->
streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, false));
+
+ verify(adminClient,
never()).removeMembersFromConsumerGroup(eq(groupId), any());
+ }
+
+ @Test
+ public void shouldRemoveIfThereAreMembersAndForce() throws Exception {
+ final String groupId = "groupId";
+
+ final Admin adminClient = mock(Admin.class);
+ final ConsumerGroupDescription consumerGroupDescription =
mock(ConsumerGroupDescription.class);
+
+ when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+ .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId,
KafkaFutureImpl.completedFuture(consumerGroupDescription))));
+ when(adminClient.removeMembersFromConsumerGroup(eq(groupId), any()))
+
.thenReturn(createRemoveMembersFromConsumerGroupResult(KafkaFutureImpl.completedFuture(Map.of()),
Set.of()));
+
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+ streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);
+
+ verify(adminClient).removeMembersFromConsumerGroup(eq(groupId), any());
+ }
+
+ @Test
+ public void shouldIgnoreGroupIdNotFoundException() throws Exception {
+ final String groupId = "groupId";
+
+ final Admin adminClient = mock(Admin.class);
+ final ConsumerGroupDescription consumerGroupDescription =
mock(ConsumerGroupDescription.class);
+
+ final KafkaFutureImpl<ConsumerGroupDescription> future = new
KafkaFutureImpl<>();
+ future.completeExceptionally(new GroupIdNotFoundException(groupId));
+ when(adminClient.describeConsumerGroups(eq(Set.of(groupId)), any()))
+ .thenReturn(new DescribeConsumerGroupsResult(Map.of(groupId,
future)));
+
when(consumerGroupDescription.members()).thenReturn(List.of(mock(MemberDescription.class)));
+
+ streamsResetter.maybeDeleteActiveConsumers(groupId, adminClient, true);
+
+ verify(adminClient,
never()).removeMembersFromConsumerGroup(eq(groupId), any());
+ }
+
+ private RemoveMembersFromConsumerGroupResult
createRemoveMembersFromConsumerGroupResult(final
KafkaFuture<Map<LeaveGroupRequestData.MemberIdentity, Errors>> future,
+
final Set<MemberToRemove> memberInfos) throws Exception {
+ final Constructor<RemoveMembersFromConsumerGroupResult> constructor =
RemoveMembersFromConsumerGroupResult.class.getDeclaredConstructor(KafkaFuture.class,
Set.class);
+ constructor.setAccessible(true);
+ return constructor.newInstance(future, memberInfos);
+ }
+
private Cluster createCluster(final int numNodes) {
final HashMap<Integer, Node> nodes = new HashMap<>();
for (int i = 0; i < numNodes; ++i) {
@@ -315,4 +431,4 @@ public class StreamsResetterTest {
return topicPartitionToOffsetAndTimestamp;
}
}
-}
\ No newline at end of file
+}