This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fa17bd44b30 KAFKA-16718-4/n: ShareGroupCommand changes for
DeleteShareGroupOffsets admin call (#19587)
fa17bd44b30 is described below
commit fa17bd44b30702fdfa393ae8ce7ab6e3a1f884d8
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Wed Apr 30 20:49:11 2025 +0530
KAFKA-16718-4/n: ShareGroupCommand changes for DeleteShareGroupOffsets
admin call (#19587)
This PR is the last in series to implement the DeleteShareGroupOffsets
request. This PR includes the changes in ShareGroupCommand which
internally calls the admin api to delete the offsets. Now, any enduser
will be able to delete share group offsets for topics subscribed by a
share group using kafka-share-groups.sh --delete-offsets command.
Reviewers: Andrew Schofield <[email protected]>
---
.../tools/consumer/group/ShareGroupCommand.java | 84 ++++++++-
.../consumer/group/ShareGroupCommandOptions.java | 4 +-
.../consumer/group/ShareGroupCommandTest.java | 205 +++++++++++++++++++++
3 files changed, 290 insertions(+), 3 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index dcebff0d3ea..cfe3fee5812 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -19,6 +19,8 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.GroupListing;
@@ -33,6 +35,7 @@ import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -89,7 +92,7 @@ public class ShareGroupCommand {
} else if (opts.options.has(opts.resetOffsetsOpt)) {
throw new UnsupportedOperationException("--reset-offsets
option is not yet implemented");
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
- throw new UnsupportedOperationException("--delete-offsets
option is not yet implemented");
+ shareGroupService.deleteOffsets();
}
} catch (IllegalArgumentException e) {
CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
@@ -285,6 +288,85 @@ public class ShareGroupCommand {
return failed;
}
+ void deleteOffsets() {
+ String groupId = opts.options.valueOf(opts.groupOpt);
+ List<String> topics = opts.options.valuesOf(opts.topicOpt);
+
+ Entry<Throwable, Map<String, Throwable>> res =
sendDeleteShareGroupOffsetsRequest(groupId, new HashSet<>(topics));
+
+ Throwable topLevelResult = res.getKey();
+ Map<String, Throwable> topicLevelResult = res.getValue();
+
+ if (topLevelResult != null) {
+ Errors topLevelError = Errors.forException(topLevelResult);
+ switch (topLevelError) {
+ case INVALID_GROUP_ID:
+ case GROUP_ID_NOT_FOUND:
+ case GROUP_AUTHORIZATION_FAILED:
+ case NON_EMPTY_GROUP:
+ printError(topLevelResult.getMessage(),
Optional.empty());
+ break;
+ case TOPIC_AUTHORIZATION_FAILED:
+ case UNKNOWN_TOPIC_OR_PARTITION:
+ // These are expected topic-level errors which will be
reported in the topic-level results
+ break;
+ default:
+ printError("Encounter some unknown error: " +
topLevelResult, Optional.empty());
+ }
+ }
+
+ if (topicLevelResult != null && !topicLevelResult.isEmpty()) {
+ int maxTopicLen = 15;
+ for (String topic : topicLevelResult.keySet()) {
+ maxTopicLen = Math.max(maxTopicLen, topic.length());
+ }
+
+ String format = "%n%" + (-maxTopicLen) + "s %s";
+
+ System.out.printf(format, "TOPIC", "STATUS");
+ topicLevelResult.entrySet().stream()
+ .sorted(Entry.comparingByKey())
+ .forEach(e -> {
+ String topic = e.getKey();
+ Throwable error = e.getValue();
+ System.out.printf(format,
+ topic,
+ error != null ? "Error: " + error.getMessage() :
"Successful"
+ );
+ });
+ }
+
+ System.out.println();
+ }
+
+ Entry<Throwable, Map<String, Throwable>>
sendDeleteShareGroupOffsetsRequest(String groupId, Set<String> topics) {
+ Map<String, Throwable> topicLevelResult = new HashMap<>();
+
+ DeleteShareGroupOffsetsResult deleteResult =
adminClient.deleteShareGroupOffsets(
+ groupId,
+ new HashSet<>(topics),
+ withTimeoutMs(new DeleteShareGroupOffsetsOptions()));
+
+ Throwable topLevelException = null;
+
+ try {
+ deleteResult.all().get();
+ } catch (ExecutionException | InterruptedException e) {
+ topLevelException = e.getCause();
+ }
+
+ topics.forEach(topic -> {
+ try {
+ deleteResult.topicResult(topic).get();
+ topicLevelResult.put(topic, null);
+ } catch (ExecutionException | InterruptedException e) {
+ topicLevelResult.put(topic, e.getCause());
+ }
+ });
+
+ return new SimpleImmutableEntry<>(topLevelException,
topicLevelResult);
+ }
+
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
return options.timeoutMs(t);
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index 3ba0a707ee5..be99d2946a7 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -145,7 +145,7 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
.availableIf(describeOpt);
allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt,
allGroupsOpt));
- allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt,
describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt));
+ allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt,
describeOpt, deleteOpt, resetOffsetsOpt));
allResetOffsetScenarioOpts = new
HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt,
resetToLatestOpt));
allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt,
topicOpt));
@@ -208,7 +208,7 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
}
CommandLineUtils.checkInvalidArgs(parser, options, groupOpt,
minus(allGroupSelectionScopeOpts, groupOpt));
- CommandLineUtils.checkInvalidArgs(parser, options, groupOpt,
minus(allShareGroupLevelOpts, describeOpt, deleteOpt, deleteOffsetsOpt,
resetOffsetsOpt));
+ CommandLineUtils.checkInvalidArgs(parser, options, groupOpt,
minus(allShareGroupLevelOpts, describeOpt, deleteOpt, resetOffsetsOpt));
CommandLineUtils.checkInvalidArgs(parser, options, topicOpt,
minus(allShareGroupLevelOpts, deleteOpt, resetOffsetsOpt));
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index c3690d953ab..f1f91217511 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
@@ -38,6 +39,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
@@ -77,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -612,6 +615,204 @@ public class ShareGroupCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ShareGroupCommand.groupStatesFromString(" , ,"));
}
+ @Test
+ public void testDeleteShareGroupOffsetsArgsWithoutTopic() {
+ String bootstrapServer = "localhost:9092";
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ // no group spec args
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete-offsets", "--group", "groupId"};
+ AtomicBoolean exited = new AtomicBoolean(false);
+ Exit.setExitProcedure(((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Option [delete-offsets] takes the
following options: [topic], [group]"));
+ exited.set(true);
+ }));
+ try {
+ getShareGroupService(cgcArgs, adminClient);
+ } finally {
+ assertTrue(exited.get());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsArgsWithoutGroup() {
+ String bootstrapServer = "localhost:9092";
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ // no group spec args
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--delete-offsets", "--topic", "t1"};
+ AtomicBoolean exited = new AtomicBoolean(false);
+ Exit.setExitProcedure(((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Option [delete-offsets] takes the
following options: [topic], [group]"));
+ exited.set(true);
+ }));
+ try {
+ getShareGroupService(cgcArgs, adminClient);
+ } finally {
+ assertTrue(exited.get());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsets() throws Exception {
+ String firstGroup = "first-group";
+ String firstTopic = "t1";
+ String secondTopic = "t2";
+ String bootstrapServer = "localhost:9092";
+
+ List<String> cgcArgs = new
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer,
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic",
secondTopic));
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupOffsetsResult result =
mock(DeleteShareGroupOffsetsResult.class);
+
+ when(result.all()).thenReturn(KafkaFuture.completedFuture(null));
+
+
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+
when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null));
+
+ when(adminClient.deleteShareGroupOffsets(any(), any(),
any())).thenReturn(result);
+
+ try (ShareGroupService service =
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+ TestUtils.waitForCondition(() -> {
+ Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+ String[] lines = res.getKey().trim().split("\n");
+ if (lines.length != 3 && !res.getValue().isEmpty()) {
+ return false;
+ }
+
+ List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+ List<String> expectedResultValues1 = List.of(firstTopic,
"Successful");
+ List<String> expectedResultValues2 = List.of(secondTopic,
"Successful");
+
+ return
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader)
&&
+
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1)
&&
+
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2);
+ }, "Expected a data row and no error in delete offsets result with
group: " + firstGroup + " and topic: " + firstTopic);
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsMultipleGroups() {
+ String firstGroup = "first-group";
+ String secondGroup = "second-group";
+ String firstTopic = "t1";
+ String secondTopic = "t2";
+ String bootstrapServer = "localhost:9092";
+
+ List<String> cgcArgs = new
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer,
"--delete-offsets", "--group", firstGroup, "--group", secondGroup, "--topic",
firstTopic, "--topic", secondTopic));
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ try (ShareGroupService service =
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+ service.deleteOffsets();
+ fail("Expected error was not detected while trying delete offsets
multiple groups");
+ } catch (Exception e) {
+ String expectedErrorMessage = "Found multiple arguments for option
group, but you asked for only one";
+ assertEquals(expectedErrorMessage, e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsTopLevelError() throws Exception {
+ String firstGroup = "first-group";
+ String firstTopic = "t1";
+ String secondTopic = "t2";
+ String bootstrapServer = "localhost:9092";
+
+ List<String> cgcArgs = new
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer,
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic",
secondTopic));
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupOffsetsResult result =
mock(DeleteShareGroupOffsetsResult.class);
+
+ KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+ String errorMessage = "Group g3 not found.";
+ GroupIdNotFoundException exception = new
GroupIdNotFoundException(errorMessage);
+
+ resultFuture.completeExceptionally(exception);
+ when(result.all()).thenReturn(resultFuture);
+
+ when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture);
+ when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+ when(adminClient.deleteShareGroupOffsets(any(), any(),
any())).thenReturn(result);
+
+ try (ShareGroupService service =
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+ TestUtils.waitForCondition(() -> {
+ Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+ String[] lines = res.getKey().trim().split("\n");
+ if (lines.length != 5 && !res.getValue().isEmpty()) {
+ return false;
+ }
+
+ List<String> error = Stream.concat(
+ Stream.of("Error:"),
+ Arrays.stream(errorMessage.trim().split("\\s+"))
+ ).toList();
+
+ List<String> errorLine = new ArrayList<>(error);
+ List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+ List<String> expectedResultValue1 = new ArrayList<>();
+ expectedResultValue1.add(firstTopic);
+ expectedResultValue1.addAll(error);
+ List<String> expectedResultValue2 = new ArrayList<>();
+ expectedResultValue2.add(secondTopic);
+ expectedResultValue2.addAll(error);
+
+ return
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(errorLine) &&
+
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultHeader)
&&
+
Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(expectedResultValue1)
&&
+
Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedResultValue2);
+ }, "Expected a data row and no error in delete offsets result with
group: " + firstGroup + " and topic: " + firstTopic);
+ }
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsTopicLevelError() throws Exception {
+ String firstGroup = "first-group";
+ String firstTopic = "t1";
+ String secondTopic = "t2";
+ String bootstrapServer = "localhost:9092";
+
+ List<String> cgcArgs = new
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer,
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic",
secondTopic));
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DeleteShareGroupOffsetsResult result =
mock(DeleteShareGroupOffsetsResult.class);
+
+ KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+ String errorMessage = Errors.UNKNOWN_TOPIC_OR_PARTITION.message();
+
+
resultFuture.completeExceptionally(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());
+ when(result.all()).thenReturn(resultFuture);
+
+
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+ when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+ when(adminClient.deleteShareGroupOffsets(any(), any(),
any())).thenReturn(result);
+
+ try (ShareGroupService service =
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+ TestUtils.waitForCondition(() -> {
+ Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+ String[] lines = res.getKey().trim().split("\n");
+ if (lines.length != 5 && !res.getValue().isEmpty()) {
+ return false;
+ }
+
+ List<String> error = Stream.concat(
+ Stream.of("Error:"),
+ Arrays.stream(errorMessage.trim().split("\\s+"))
+ ).toList();
+
+ List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+ List<String> expectedResultValue1 = List.of(firstTopic,
"Successful");
+ List<String> expectedResultValue2 = new ArrayList<>();
+ expectedResultValue2.add(secondTopic);
+ expectedResultValue2.addAll(error);
+
+ return
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader)
&&
+
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValue1)
&&
+
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValue2);
+ }, "Expected a data row and no error in delete offsets result with
group: " + firstGroup + " and topic: " + firstTopic);
+ }
+ }
+
@Test
public void testDeleteShareGroupsArgs() {
String bootstrapServer = "localhost:9092";
@@ -873,6 +1074,10 @@ public class ShareGroupCommandTest {
return () -> Assertions.assertDoesNotThrow(service::describeGroups);
}
+ private Runnable deleteOffsets(ShareGroupCommand.ShareGroupService
service) {
+ return () -> Assertions.assertDoesNotThrow(service::deleteOffsets);
+ }
+
private boolean checkArgsHeaderOutput(List<String> args, String output) {
if (!output.contains("GROUP")) {
return false;