This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dd784e7d7a1 KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets
(#19820)
dd784e7d7a1 is described below
commit dd784e7d7a1d5e915c43344d4659da06b02a0c98
Author: jimmy <[email protected]>
AuthorDate: Tue Jul 29 18:47:24 2025 +0800
KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets (#19820)
[KAFKA-16717](https://issues.apache.org/jira/browse/KAFKA-16717) aims to
finish the AlterShareGroupOffsets for ShareGroupCommand part.
Reviewers: Lan Ding <[email protected]>, Chia-Ping Tsai
<[email protected]>, TaiJuWu <[email protected]>, Andrew Schofield
<[email protected]>
---
.../requests/AlterShareGroupOffsetsResponse.java | 4 +-
.../kafka/clients/admin/AdminClientTestUtils.java | 17 +
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../coordinator/group/GroupCoordinatorService.java | 44 ++-
.../persister/InitializeShareGroupStateResult.java | 13 +
.../apache/kafka/server/util/CommandLineUtils.java | 5 +
.../java/org/apache/kafka/tools/OffsetsUtils.java | 84 +++++
.../tools/consumer/group/ConsumerGroupCommand.java | 71 +---
.../tools/consumer/group/ShareGroupCommand.java | 135 ++++++--
.../consumer/group/ShareGroupCommandOptions.java | 12 +
.../consumer/group/ShareGroupCommandTest.java | 374 +++++++++++++++++++++
11 files changed, 660 insertions(+), 101 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
index fd6feaded32..5da47d0d326 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java
@@ -83,11 +83,11 @@ public class AlterShareGroupOffsetsResponse extends
AbstractResponse {
return topicData;
}
- public Builder addPartition(String topic, int partition, Map<String,
Uuid> topicIdsToNames, Errors error) {
+ public Builder addPartition(String topic, int partition, Map<String,
Uuid> topicIdsToNames, ApiError error) {
AlterShareGroupOffsetsResponseTopic topicData =
getOrCreateTopic(topic, topicIdsToNames.get(topic));
topicData.partitions().add(new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition)
- .setErrorCode(error.code())
+ .setErrorCode(error.error().code())
.setErrorMessage(error.message()));
return this;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
index dc43a9f2bd4..02c09443370 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java
@@ -31,6 +31,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -194,6 +195,22 @@ public class AdminClientTestUtils {
return new ListShareGroupOffsetsResult(coordinatorFutures);
}
+ public static ListOffsetsResult
createListOffsetsResult(Map<TopicPartition, OffsetAndMetadata>
partitionOffsets) {
+ Map<TopicPartition,
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>> futures =
+ partitionOffsets.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> KafkaFuture.completedFuture(
+ new ListOffsetsResult.ListOffsetsResultInfo(
+ entry.getValue().offset(),
+ System.currentTimeMillis(),
+ Optional.of(1)
+ )
+ )
+ ));
+ return new ListOffsetsResult(futures);
+ }
+
/**
* Helper to create a KafkaAdminClient with a custom HostResolver
accessible to tests outside this package.
*/
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index c6aca487404..45bd8b7e7fb 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -3852,7 +3852,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
topicError match {
case Some(error) =>
- topic.partitions.forEach(partition =>
responseBuilder.addPartition(topic.topicName, partition.partitionIndex,
metadataCache.topicNamesToIds, error.error))
+ topic.partitions.forEach(partition =>
responseBuilder.addPartition(topic.topicName, partition.partitionIndex,
metadataCache.topicNamesToIds, error))
case None =>
authorizedTopicPartitions.add(topic.duplicate)
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index be2636b2bad..ee16f1d3767 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -709,8 +709,7 @@ public class GroupCoordinatorService implements
GroupCoordinator {
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(),
result, new ShareGroupHeartbeatResponseData());
return response;
} else {
- //TODO build new AlterShareGroupOffsetsResponseData
for error response
- return response;
+ return buildErrorResponse(response, result);
}
} else {
return buildErrorResponse(request, response, exp);
@@ -718,6 +717,42 @@ public class GroupCoordinatorService implements
GroupCoordinator {
});
}
+
+ private AlterShareGroupOffsetsResponseData
buildErrorResponse(AlterShareGroupOffsetsResponseData response,
InitializeShareGroupStateResult result) {
+ AlterShareGroupOffsetsResponseData data = new
AlterShareGroupOffsetsResponseData();
+ Map<Uuid, Map<Integer, PartitionErrorData>> topicPartitionErrorsMap =
result.getErrors();
+ data.setResponses(
+ new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream()
+ .map(topic -> {
+
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic
topicData = new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
+ .setTopicName(topic.topicName())
+ .setTopicId(topic.topicId());
+ topic.partitions().forEach(partition -> {
+ if (partition.errorCode() != Errors.NONE.code()) {
+ topicData.partitions().add(partition);
+ return;
+ }
+
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition
partitionData;
+ Map<Integer, PartitionErrorData> partitionErrors =
+ Optional.ofNullable(topicPartitionErrorsMap)
+ .map(map -> map.get(topic.topicId()))
+ .orElse(Collections.emptyMap());
+ PartitionErrorData error =
partitionErrors.get(partition.partitionIndex());
+ if (error == null) {
+ partitionData = partition.duplicate();
+ } else {
+ partitionData = new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition.partitionIndex())
+ .setErrorCode(error.errorCode())
+ .setErrorMessage(error.errorMessage());
+ }
+ topicData.partitions().add(partitionData);
+ });
+ return topicData;
+ })
+ .iterator()));
+ return data;
+ }
private AlterShareGroupOffsetsResponseData
buildErrorResponse(InitializeShareGroupStateParameters request,
AlterShareGroupOffsetsResponseData response, Throwable exp) {
// build new AlterShareGroupOffsetsResponseData for error response
@@ -726,13 +761,14 @@ public class GroupCoordinatorService implements
GroupCoordinator {
log.error("Unable to initialize share group state for {}, {} while
altering share group offsets", gtp.groupId(), gtp.topicsData(), exp);
Errors error = Errors.forException(exp);
data.setErrorCode(error.code())
- .setErrorMessage(error.message())
+ .setErrorMessage(exp.getMessage())
.setResponses(response.responses());
data.setResponses(
new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream()
.map(topic -> {
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic
topicData = new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
- .setTopicName(topic.topicName());
+ .setTopicName(topic.topicName())
+ .setTopicId(topic.topicId());
topic.partitions().forEach(partition -> {
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition
partitionData = new
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
.setPartitionIndex(partition.partitionIndex())
diff --git
a/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java
b/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java
index 722cf8da1da..becb552de21 100644
---
a/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java
+++
b/server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.share.persister;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
@@ -59,6 +60,18 @@ public class InitializeShareGroupStateResult implements
PersisterResult {
));
}
+ public Map<Uuid, Map<Integer, PartitionErrorData>> getErrors() {
+ return topicsData.stream()
+ .collect(Collectors.toMap(
+ TopicData::topicId,
+ topicData -> topicData.partitions().stream()
+ .collect(Collectors.toMap(
+ PartitionIdData::partition,
+ partitionErrorData -> partitionErrorData
+ ))
+ ));
+ }
+
public static class Builder {
private List<TopicData<PartitionErrorData>> topicsData;
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
index 82a02482db8..6146daeb45c 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
@@ -130,6 +130,11 @@ public class CommandLineUtils {
}
}
+ public static void printErrorAndExit(String message) {
+ System.err.println(message);
+ Exit.exit(1, message);
+ }
+
public static void printUsageAndExit(OptionParser parser, String message) {
System.err.println(message);
try {
diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
index c529e4b6820..e37bf2804d7 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -46,6 +48,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.ToIntFunction;
@@ -68,6 +71,28 @@ public class OffsetsUtils {
this.parser = parser;
}
+ public static void printOffsetsToReset(Map<String, Map<TopicPartition,
OffsetAndMetadata>> groupAssignmentsToReset) {
+ int maxGroupLen = Math.max(15,
groupAssignmentsToReset.keySet().stream().mapToInt(String::length).max().orElse(0));
+ int maxTopicLen = Math.max(15,
groupAssignmentsToReset.values().stream()
+ .flatMap(assignments -> assignments.keySet().stream())
+ .mapToInt(tp -> tp.topic().length())
+ .max()
+ .orElse(0));
+
+ String format = "%n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s
%-10s %s";
+ if (!groupAssignmentsToReset.isEmpty())
+ System.out.printf(format, "GROUP", "TOPIC", "PARTITION",
"NEW-OFFSET");
+
+ groupAssignmentsToReset.forEach((groupId, assignment) ->
+ assignment.forEach((consumerAssignment, offsetAndMetadata) ->
+ System.out.printf(format,
+ groupId,
+ consumerAssignment.topic(),
+ consumerAssignment.partition(),
+ offsetAndMetadata.offset())));
+ System.out.println();
+ }
+
public Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>>
resetPlanFromFile() {
if (opts.resetFromFileOpt != null && !opts.resetFromFileOpt.isEmpty())
{
try {
@@ -414,6 +439,55 @@ public class OffsetsUtils {
return preparedOffsetsForPartitionsWithCommittedOffset;
}
+ public void checkAllTopicPartitionsValid(Collection<TopicPartition>
partitionsToReset) {
+ // check the partitions exist
+ List<TopicPartition> partitionsNotExistList =
filterNonExistentPartitions(partitionsToReset);
+ if (!partitionsNotExistList.isEmpty()) {
+ String partitionStr =
partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
+ throw new UnknownTopicOrPartitionException("The partitions \"" +
partitionStr + "\" do not exist");
+ }
+
+ // check the partitions have leader
+ List<TopicPartition> partitionsWithoutLeader =
filterNoneLeaderPartitions(partitionsToReset);
+ if (!partitionsWithoutLeader.isEmpty()) {
+ String partitionStr =
partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
+ throw new LeaderNotAvailableException("The partitions \"" +
partitionStr + "\" have no leader");
+ }
+ }
+
+ public List<TopicPartition>
filterNoneLeaderPartitions(Collection<TopicPartition> topicPartitions) {
+ // collect all topics
+ Set<String> topics =
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+
+ try {
+ return
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+ .stream()
+ .flatMap(entry -> entry.getValue().partitions().stream()
+ .filter(partitionInfo -> partitionInfo.leader() == null)
+ .map(partitionInfo -> new TopicPartition(entry.getKey(),
partitionInfo.partition())))
+ .filter(topicPartitions::contains)
+ .toList();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List<TopicPartition>
filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
+ // collect all topics
+ Set<String> topics =
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ try {
+ List<TopicPartition> existPartitions =
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+ .stream()
+ .flatMap(entry -> entry.getValue().partitions().stream()
+ .map(partitionInfo -> new TopicPartition(entry.getKey(),
partitionInfo.partition())))
+ .toList();
+
+ return topicPartitions.stream().filter(tp ->
!existPartitions.contains(tp)).toList();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
int t = (int) opts.timeoutMsOpt;
return options.timeoutMs(t);
@@ -469,5 +543,15 @@ public class OffsetsUtils {
this.resetShiftByOpt = resetShiftByOpt;
this.timeoutMsOpt = timeoutMsOpt;
}
+
+ public OffsetsUtilsOptions(
+ List<String> groupOpt,
+ List<String> resetToDatetimeOpt,
+ long timeoutMsOpt) {
+
+ this.groupOpt = groupOpt;
+ this.resetToDatetimeOpt = resetToDatetimeOpt;
+ this.timeoutMsOpt = timeoutMsOpt;
+ }
}
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 2cfffb9fe78..1f29cdd8156 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -41,8 +41,6 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -133,7 +131,7 @@ public class ConsumerGroupCommand {
String exported =
consumerGroupService.exportOffsetsToCsv(offsetsToReset);
System.out.println(exported);
} else
- printOffsetsToReset(offsetsToReset);
+ OffsetsUtils.printOffsetsToReset(offsetsToReset);
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
consumerGroupService.deleteOffsets();
}
@@ -179,21 +177,6 @@ public class ConsumerGroupCommand {
e.ifPresent(Throwable::printStackTrace);
}
- static void printOffsetsToReset(Map<String, Map<TopicPartition,
OffsetAndMetadata>> groupAssignmentsToReset) {
- String format = "%n%-30s %-30s %-10s %-15s";
- if (!groupAssignmentsToReset.isEmpty())
- System.out.printf(format, "GROUP", "TOPIC", "PARTITION",
"NEW-OFFSET");
-
- groupAssignmentsToReset.forEach((groupId, assignment) ->
- assignment.forEach((consumerAssignment, offsetAndMetadata) ->
- System.out.printf(format,
- groupId,
- consumerAssignment.topic(),
- consumerAssignment.partition(),
- offsetAndMetadata.offset())));
- System.out.println();
- }
-
@SuppressWarnings("ClassFanOutComplexity")
static class ConsumerGroupService implements AutoCloseable {
final ConsumerGroupCommandOptions opts;
@@ -615,7 +598,7 @@ public class ConsumerGroupCommand {
consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt,
leaderEpoch);
};
- List<TopicPartition> topicPartitionsWithoutLeader =
filterNoneLeaderPartitions(topicPartitions);
+ List<TopicPartition> topicPartitionsWithoutLeader =
offsetsUtils.filterNoneLeaderPartitions(topicPartitions);
List<TopicPartition> topicPartitionsWithLeader =
topicPartitions.stream().filter(tp ->
!topicPartitionsWithoutLeader.contains(tp)).toList();
// prepare data for partitions with leaders
@@ -645,22 +628,6 @@ public class ConsumerGroupCommand {
.collect(Collectors.toList());
}
- private List<TopicPartition>
filterNoneLeaderPartitions(Collection<TopicPartition> topicPartitions) {
- // collect all topics
- Set<String> topics =
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
-
- try {
- return
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
- .stream()
- .flatMap(entry ->
entry.getValue().partitions().stream()
- .filter(partitionInfo ->
partitionInfo.leader() == null)
- .map(partitionInfo -> new
TopicPartition(entry.getKey(), partitionInfo.partition())))
- .toList();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
List<String> groupIds = opts.options.has(opts.allGroupsOpt)
? listConsumerGroups()
@@ -1002,7 +969,7 @@ public class ConsumerGroupCommand {
private Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(String groupId, Collection<TopicPartition>
partitionsToReset) {
// ensure all partitions are valid, otherwise throw a runtime
exception
- checkAllTopicPartitionsValid(partitionsToReset);
+ offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
if (opts.options.has(opts.resetToOffsetOpt)) {
return offsetsUtils.resetToOffset(partitionsToReset);
@@ -1028,38 +995,6 @@ public class ConsumerGroupCommand {
return null;
}
- private void checkAllTopicPartitionsValid(Collection<TopicPartition>
partitionsToReset) {
- // check the partitions exist
- List<TopicPartition> partitionsNotExistList =
filterNonExistentPartitions(partitionsToReset);
- if (!partitionsNotExistList.isEmpty()) {
- String partitionStr =
partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
- throw new UnknownTopicOrPartitionException("The partitions \""
+ partitionStr + "\" do not exist");
- }
-
- // check the partitions have leader
- List<TopicPartition> partitionsWithoutLeader =
filterNoneLeaderPartitions(partitionsToReset);
- if (!partitionsWithoutLeader.isEmpty()) {
- String partitionStr =
partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
- throw new LeaderNotAvailableException("The partitions \"" +
partitionStr + "\" have no leader");
- }
- }
-
- private List<TopicPartition>
filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
- // collect all topics
- Set<String> topics =
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
- try {
- List<TopicPartition> existPartitions =
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
- .stream()
- .flatMap(entry ->
entry.getValue().partitions().stream()
- .map(partitionInfo -> new
TopicPartition(entry.getKey(), partitionInfo.partition())))
- .toList();
-
- return topicPartitions.stream().filter(element ->
!existPartitions.contains(element)).toList();
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
String exportOffsetsToCsv(Map<String, Map<TopicPartition,
OffsetAndMetadata>> assignments) {
boolean isSingleGroupQuery =
opts.options.valuesOf(opts.groupOpt).size() == 1;
ObjectWriter csvWriter = isSingleGroupQuery
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 8996f92b84e..95723665032 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -33,12 +33,14 @@ import
org.apache.kafka.clients.admin.ShareMemberDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.tools.OffsetsUtils;
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
@@ -92,7 +94,7 @@ public class ShareGroupCommand {
} else if (opts.options.has(opts.deleteOpt)) {
shareGroupService.deleteShareGroups();
} else if (opts.options.has(opts.resetOffsetsOpt)) {
- throw new UnsupportedOperationException("--reset-offsets
option is not yet implemented");
+ shareGroupService.resetOffsets();
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
shareGroupService.deleteOffsets();
}
@@ -123,6 +125,7 @@ public class ShareGroupCommand {
static class ShareGroupService implements AutoCloseable {
final ShareGroupCommandOptions opts;
private final Admin adminClient;
+ private final OffsetsUtils offsetsUtils;
public ShareGroupService(ShareGroupCommandOptions opts, Map<String,
String> configOverrides) {
this.opts = opts;
@@ -131,11 +134,20 @@ public class ShareGroupCommand {
} catch (IOException e) {
throw new RuntimeException(e);
}
+ this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser,
getOffsetsUtilsOptions(opts));
}
public ShareGroupService(ShareGroupCommandOptions opts, Admin
adminClient) {
this.opts = opts;
this.adminClient = adminClient;
+ this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser,
getOffsetsUtilsOptions(opts));
+ }
+
+ private OffsetsUtils.OffsetsUtilsOptions
getOffsetsUtilsOptions(ShareGroupCommandOptions opts) {
+ return
+ new
OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
+ opts.options.valuesOf(opts.resetToDatetimeOpt),
+ opts.options.valueOf(opts.timeoutMsOpt));
}
public void listGroups() throws ExecutionException,
InterruptedException {
@@ -366,6 +378,75 @@ public class ShareGroupCommand {
return new SimpleImmutableEntry<>(topLevelException,
topicLevelResult);
}
+ void resetOffsets() {
+ String groupId = opts.options.valueOf(opts.groupOpt);
+ try {
+ ShareGroupDescription shareGroupDescription =
describeShareGroups(List.of(groupId)).get(groupId);
+ if
(!(GroupState.EMPTY.equals(shareGroupDescription.groupState()) ||
GroupState.DEAD.equals(shareGroupDescription.groupState()))) {
+ CommandLineUtils.printErrorAndExit(String.format("Share
group '%s' is not empty.", groupId));
+ }
+ Map<TopicPartition, OffsetAndMetadata> offsetsToReset =
prepareOffsetsToReset(groupId);
+ if (offsetsToReset == null) {
+ return;
+ }
+ boolean dryRun = opts.options.has(opts.dryRunOpt) ||
!opts.options.has(opts.executeOpt);
+ if (!dryRun) {
+ adminClient.alterShareGroupOffsets(groupId,
+ offsetsToReset.entrySet().stream()
+ .collect(Collectors.toMap(
+ Entry::getKey, entry ->
entry.getValue().offset()
+ ))
+ ).all().get();
+ }
+ OffsetsUtils.printOffsetsToReset(Map.of(groupId,
offsetsToReset));
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ } catch (ExecutionException ee) {
+ Throwable cause = ee.getCause();
+ if (cause instanceof KafkaException) {
+ CommandLineUtils.printErrorAndExit(cause.getMessage());
+ } else {
+ throw new RuntimeException(cause);
+ }
+ }
+ }
+
+ protected Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(String groupId) throws ExecutionException,
InterruptedException {
+ Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
+ Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+ Collection<TopicPartition> partitionsToReset;
+
+ if (opts.options.has(opts.topicOpt)) {
+ partitionsToReset =
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
+ Set<String> subscribedTopics =
offsetsByTopicPartitions.keySet().stream()
+ .map(TopicPartition::topic)
+ .collect(Collectors.toSet());
+ Set<String> resetTopics = partitionsToReset.stream()
+ .map(TopicPartition::topic)
+ .collect(Collectors.toSet());
+ if (!subscribedTopics.containsAll(resetTopics)) {
+ CommandLineUtils
+ .printErrorAndExit(String.format("Share group '%s' is
not subscribed to topic '%s'.",
+ groupId, resetTopics.stream().filter(topic ->
!subscribedTopics.contains(topic)).collect(Collectors.joining(", "))));
+ return null;
+ }
+ } else {
+ partitionsToReset = offsetsByTopicPartitions.keySet();
+ }
+
+ offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
+ if (opts.options.has(opts.resetToEarliestOpt)) {
+ return offsetsUtils.resetToEarliest(partitionsToReset);
+ } else if (opts.options.has(opts.resetToLatestOpt)) {
+ return offsetsUtils.resetToLatest(partitionsToReset);
+ } else if (opts.options.has(opts.resetToDatetimeOpt)) {
+ return offsetsUtils.resetToDateTime(partitionsToReset);
+ }
+ CommandLineUtils
+ .printUsageAndExit(opts.parser, String.format("Option '%s'
requires one of the following scenarios: %s", opts.resetOffsetsOpt,
opts.allResetOffsetScenarioOpts));
+ return null;
+ }
+
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
return options.timeoutMs(t);
@@ -396,34 +477,11 @@ public class ShareGroupCommand {
TreeMap<String, Entry<ShareGroupDescription,
Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
shareGroups.forEach((groupId, shareGroup) -> {
- ListShareGroupOffsetsSpec offsetsSpec = new
ListShareGroupOffsetsSpec();
- Map<String, ListShareGroupOffsetsSpec> groupSpecs = new
HashMap<>();
- groupSpecs.put(groupId, offsetsSpec);
+ Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
try {
Map<TopicPartition, OffsetAndMetadata> startOffsets =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
-
- Set<SharePartitionOffsetInformation> partitionOffsets =
new HashSet<>();
-
- startOffsets.forEach((tp, offsetAndMetadata) -> {
- if (offsetAndMetadata != null) {
- partitionOffsets.add(new
SharePartitionOffsetInformation(
- groupId,
- tp.topic(),
- tp.partition(),
- Optional.of(offsetAndMetadata.offset()),
- offsetAndMetadata.leaderEpoch()
- ));
- } else {
- partitionOffsets.add(new
SharePartitionOffsetInformation(
- groupId,
- tp.topic(),
- tp.partition(),
- Optional.empty(),
- Optional.empty()
- ));
- }
- });
+ Set<SharePartitionOffsetInformation> partitionOffsets =
mapOffsetsToSharePartitionInformation(groupId, startOffsets);
groupOffsets.put(groupId, new
SimpleImmutableEntry<>(shareGroup, partitionOffsets));
} catch (InterruptedException | ExecutionException e) {
@@ -434,6 +492,31 @@ public class ShareGroupCommand {
return groupOffsets;
}
+ private static Set<SharePartitionOffsetInformation>
mapOffsetsToSharePartitionInformation(String groupId, Map<TopicPartition,
OffsetAndMetadata> startOffsets) {
+ Set<SharePartitionOffsetInformation> partitionOffsets = new
HashSet<>();
+
+ startOffsets.forEach((tp, offsetAndMetadata) -> {
+ if (offsetAndMetadata != null) {
+ partitionOffsets.add(new SharePartitionOffsetInformation(
+ groupId,
+ tp.topic(),
+ tp.partition(),
+ Optional.of(offsetAndMetadata.offset()),
+ offsetAndMetadata.leaderEpoch()
+ ));
+ } else {
+ partitionOffsets.add(new SharePartitionOffsetInformation(
+ groupId,
+ tp.topic(),
+ tp.partition(),
+ Optional.empty(),
+ Optional.empty()
+ ));
+ }
+ });
+ return partitionOffsets;
+ }
+
private void printOffsets(TreeMap<String, Entry<ShareGroupDescription,
Collection<SharePartitionOffsetInformation>>> offsets, boolean verbose) {
offsets.forEach((groupId, tuple) -> {
Collection<SharePartitionOffsetInformation> offsetsInfo =
tuple.getValue().stream()
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index f155a9c4b5e..e85822d4971 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -88,6 +88,7 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
final OptionSpec<Void> verboseOpt;
final Set<OptionSpec<?>> allGroupSelectionScopeOpts;
+ final Set<OptionSpec<?>> allTopicSelectionScopeOpts;
final Set<OptionSpec<?>> allShareGroupLevelOpts;
final Set<OptionSpec<?>> allResetOffsetScenarioOpts;
final Set<OptionSpec<?>> allDeleteOffsetsOpts;
@@ -143,6 +144,7 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
.availableIf(describeOpt);
allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt);
+ allTopicSelectionScopeOpts = Set.of(topicOpt, allTopicsOpt);
allShareGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt,
resetOffsetsOpt);
allResetOffsetScenarioOpts = Set.of(resetToDatetimeOpt,
resetToEarliestOpt, resetToLatestOpt);
allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt);
@@ -200,6 +202,16 @@ public class ShareGroupCommandOptions extends
CommandDefaultOptions {
CommandLineUtils.printUsageAndExit(parser,
"Option " + resetOffsetsOpt + " takes the option: " +
groupOpt);
+ if (!options.has(topicOpt) && !options.has(allTopicsOpt)) {
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + resetOffsetsOpt + " takes one of these
options: " +
allTopicSelectionScopeOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
+ }
+
+ if (!options.has(resetToEarliestOpt) &&
!options.has(resetToLatestOpt) && !options.has(resetToDatetimeOpt)) {
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + resetOffsetsOpt + " takes one of these
options: " +
allResetOffsetScenarioOpts.stream().map(Object::toString).sorted().collect(Collectors.joining(",
")));
+ }
+
CommandLineUtils.checkInvalidArgs(parser, options,
resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt));
CommandLineUtils.checkInvalidArgs(parser, options,
resetToEarliestOpt, minus(allResetOffsetScenarioOpts, resetToEarliestOpt));
CommandLineUtils.checkInvalidArgs(parser, options,
resetToLatestOpt, minus(allResetOffsetScenarioOpts, resetToLatestOpt));
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 19a11ce0f90..e80071907e7 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -19,25 +19,34 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
+import org.apache.kafka.clients.admin.DescribeTopicsOptions;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
+import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
import org.apache.kafka.clients.admin.ShareMemberDescription;
+import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.protocol.Errors;
@@ -50,6 +59,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
import java.util.ArrayList;
@@ -67,10 +77,12 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import java.util.stream.Stream;
import joptsimple.OptionException;
+import static org.apache.kafka.common.KafkaFuture.completedFuture;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -79,7 +91,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyCollection;
import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -1052,6 +1066,366 @@ public class ShareGroupCommandTest {
assertEquals(expectedResults, service.deleteShareGroups());
}
}
+
+ @Test
+ public void testAlterShareGroupMultipleTopicsSuccess() {
+ String group = "share-group";
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String bootstrapServer = "localhost:9092";
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--reset-offsets", "--to-earliest", "--execute", "--topic", topic1, "--topic",
topic2, "--group", group};
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ group,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1,
0), new OffsetAndMetadata(10L), new TopicPartition(topic1, 1), new
OffsetAndMetadata(10L),
+ new TopicPartition(topic2, 0), new OffsetAndMetadata(0L)))
+ )
+ );
+
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+
+ AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mockAlterShareGroupOffsets(adminClient, group);
+ TopicPartition tp0 = new TopicPartition(topic1, 0);
+ TopicPartition tp1 = new TopicPartition(topic1, 1);
+ TopicPartition tp2 = new TopicPartition(topic2, 0);
+ Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(tp0,
new OffsetAndMetadata(0L), tp1, new OffsetAndMetadata(0L),
+ tp2, new OffsetAndMetadata(0L));
+ ListOffsetsResult listOffsetsResult =
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+ when(adminClient.listOffsets(any(),
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+
+ ShareGroupDescription exp = new ShareGroupDescription(
+ group,
+ List.of(),
+ GroupState.EMPTY,
+ new Node(0, "host1", 9090), 0, 0);
+ DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group,
KafkaFuture.completedFuture(exp)));
+ when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+ Map<String, TopicDescription> d1 = Map.of(
+ topic1, new TopicDescription(topic1, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()),
+ new TopicPartitionInfo(1, Node.noNode(), List.of(), List.of()))
+ ));
+ Map<String, TopicDescription> d2 = Map.of(
+ topic2, new TopicDescription(topic2, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+ )));
+ DescribeTopicsResult topicsResult1 = mock(DescribeTopicsResult.class);
+ DescribeTopicsResult topicsResult2 = mock(DescribeTopicsResult.class);
+ when(topicsResult1.allTopicNames()).thenReturn(completedFuture(d1));
+ when(topicsResult2.allTopicNames()).thenReturn(completedFuture(d2));
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(topicsResult1, topicsResult2);
+
when(adminClient.describeTopics(anyCollection())).thenReturn(topicsResult1,
topicsResult2);
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ service.resetOffsets();
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+ verify(adminClient).describeTopics(anyCollection(),
any(DescribeTopicsOptions.class));
+ verify(alterShareGroupOffsetsResult, times(1)).all();
+
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
+ }
+ }
+
+ @Test
+ public void testAlterShareGroupToLatestSuccess() {
+ String group = "share-group";
+ String topic = "topic";
+ String bootstrapServer = "localhost:9092";
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--reset-offsets", "--to-latest", "--execute", "--topic", topic, "--group",
group};
+ Admin adminClient = mock(KafkaAdminClient.class);
+ TopicPartition t1 = new TopicPartition(topic, 0);
+ TopicPartition t2 = new TopicPartition(topic, 1);
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ group,
+ KafkaFuture.completedFuture(Map.of(t1, new
OffsetAndMetadata(10L), t2, new OffsetAndMetadata(10L)))
+ )
+ );
+ Map<String, TopicDescription> descriptions = Map.of(
+ topic, new TopicDescription(topic, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()),
+ new TopicPartitionInfo(1, Node.noNode(), List.of(), List.of()))
+ ));
+ DescribeTopicsResult describeTopicResult =
mock(DescribeTopicsResult.class);
+
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+
+ AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mockAlterShareGroupOffsets(adminClient, group);
+ Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(t1,
new OffsetAndMetadata(40L), t2, new OffsetAndMetadata(40L));
+ ListOffsetsResult listOffsetsResult =
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+ when(adminClient.listOffsets(any(),
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+
+ ShareGroupDescription exp = new ShareGroupDescription(
+ group,
+ List.of(),
+ GroupState.EMPTY,
+ new Node(0, "host1", 9090), 0, 0);
+ DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group,
KafkaFuture.completedFuture(exp)));
+ when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+ Function<Collection<TopicPartition>,
ArgumentMatcher<Map<TopicPartition, OffsetSpec>>> offsetsArgMatcher =
expectedPartitions ->
+ topicPartitionOffsets -> topicPartitionOffsets != null &&
topicPartitionOffsets.keySet().equals(expectedPartitions) &&
+ topicPartitionOffsets.values().stream().allMatch(offsetSpec ->
offsetSpec instanceof OffsetSpec.LatestSpec);
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ service.resetOffsets();
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+ verify(adminClient,
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher.apply(Set.of(t1,
t2))), any());
+ verify(alterShareGroupOffsetsResult, times(1)).all();
+
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
+ }
+ }
+
+ @Test
+ public void testAlterShareGroupAllTopicsToDatetimeSuccess() {
+ String group = "share-group";
+ String topic1 = "topic1";
+ String topic2 = "topic2";
+ String topic3 = "topic3";
+ String bootstrapServer = "localhost:9092";
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--reset-offsets", "--to-datetime", "2025-07-20T01:20:38.198", "--execute",
"--all-topics", "--group", group};
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ group,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1,
0), new OffsetAndMetadata(5L), new TopicPartition(topic1, 1), new
OffsetAndMetadata(10L),
+ new TopicPartition(topic2, 0), new OffsetAndMetadata(10L),
new TopicPartition(topic3, 0), new OffsetAndMetadata(10L)))
+ )
+ );
+
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
+ Set<String> topics = Set.of(topic1, topic2, topic3);
+ when(listTopicsResult.names()).thenReturn(completedFuture(topics));
+ when(adminClient.listTopics()).thenReturn(listTopicsResult);
+
+ AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mockAlterShareGroupOffsets(adminClient, group);
+ TopicPartition tp1 = new TopicPartition(topic1, 0);
+ TopicPartition tp2 = new TopicPartition(topic1, 1);
+ TopicPartition tp3 = new TopicPartition(topic2, 0);
+ TopicPartition tp4 = new TopicPartition(topic3, 0);
+ Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(tp1,
new OffsetAndMetadata(10L), tp2, new OffsetAndMetadata(15L),
+ tp3, new OffsetAndMetadata(15L), tp4, new OffsetAndMetadata(15L));
+ ListOffsetsResult listOffsetsResult =
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+ when(adminClient.listOffsets(any(),
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+ Map<String, TopicDescription> descriptions = Map.of(
+ topic1, new TopicDescription(topic1, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()),
+ new TopicPartitionInfo(1, Node.noNode(), List.of(), List.of())
+ )),
+ topic2, new TopicDescription(topic2, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+ )),
+ topic3, new TopicDescription(topic3, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()
+ ))
+ ));
+ DescribeTopicsResult describeTopicResult =
mock(DescribeTopicsResult.class);
+
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+
+ ShareGroupDescription exp = new ShareGroupDescription(
+ group,
+ List.of(),
+ GroupState.EMPTY,
+ new Node(0, "host1", 9090), 0, 0);
+ DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group,
KafkaFuture.completedFuture(exp)));
+ when(adminClient.describeShareGroups(anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+ Function<Collection<TopicPartition>,
ArgumentMatcher<Map<TopicPartition, OffsetSpec>>> offsetsArgMatcher =
expectedPartitions ->
+ topicPartitionOffsets -> topicPartitionOffsets != null &&
topicPartitionOffsets.keySet().equals(expectedPartitions) &&
+ topicPartitionOffsets.values().stream().allMatch(offsetSpec ->
offsetSpec instanceof OffsetSpec.TimestampSpec);
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ service.resetOffsets();
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+ verify(adminClient,
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher.apply(Set.of(tp1,
tp2, tp3, tp4))), any());
+ verify(alterShareGroupOffsetsResult, times(1)).all();
+
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
+ }
+ }
+
+ @Test
+ public void testResetOffsetsDryRunSuccess() {
+ String group = "share-group";
+ String topic = "topic";
+ String bootstrapServer = "localhost:9092";
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--reset-offsets", "--to-earliest", "--dry-run", "--topic", topic, "--group",
group};
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ group,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition(topic,
0), new OffsetAndMetadata(10L)))
+ )
+ );
+
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+
+ Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(new
TopicPartition(topic, 0), new OffsetAndMetadata(0L));
+ ListOffsetsResult listOffsetsResult =
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+ when(adminClient.listOffsets(any(),
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+
+ AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mock(AlterShareGroupOffsetsResult.class);
+
when(alterShareGroupOffsetsResult.all()).thenReturn(KafkaFuture.completedFuture(null));
+ when(adminClient.alterShareGroupOffsets(any(),
any())).thenReturn(alterShareGroupOffsetsResult);
+ Map<String, TopicDescription> descriptions = Map.of(
+ topic, new TopicDescription(topic, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+ )));
+ DescribeTopicsResult describeTopicResult =
mock(DescribeTopicsResult.class);
+
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+
+ ShareGroupDescription exp = new ShareGroupDescription(
+ group,
+ List.of(),
+ GroupState.EMPTY,
+ new Node(0, "host1", 9090), 0, 0);
+ DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group,
KafkaFuture.completedFuture(exp)));
+ when(adminClient.describeShareGroups(anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+
+ try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
+ service.resetOffsets();
+ verify(adminClient, times(0)).alterShareGroupOffsets(any(), any());
+ }
+ }
+
+ @Test
+ public void testAlterShareGroupOffsetsFailureWithoutTopic() {
+ String bootstrapServer = "localhost:9092";
+ String group = "share-group";
+ Admin adminClient = mock(KafkaAdminClient.class);
+ // no group spec args
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--reset-offsets", "--to-earliest", "--execute", "--group", group};
+ AtomicBoolean exited = new AtomicBoolean(false);
+ Exit.setExitProcedure(((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Option [reset-offsets] takes one of
these options: [all-topics], [topic]"));
+ exited.set(true);
+ }));
+ try {
+ getShareGroupService(cgcArgs, adminClient);
+ } finally {
+ assertTrue(exited.get());
+ }
+ }
+
+ @Test
+ public void testAlterShareGroupOffsetsFailureWithNoneEmptyGroup() {
+ String group = "share-group";
+ String topic = "topic";
+ String bootstrapServer = "localhost:9092";
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--reset-offsets", "--to-earliest", "--execute", "--topic", topic, "--group",
group};
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ group,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
+ )
+ );
+
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
+ Set<String> topics = Set.of("topic");
+ when(listTopicsResult.names()).thenReturn(completedFuture(topics));
+ when(adminClient.listTopics()).thenReturn(listTopicsResult);
+
+ ShareGroupDescription exp = new ShareGroupDescription(
+ group,
+ List.of(new ShareMemberDescription("memid1", "clId1", "host1", new
ShareMemberAssignment(
+ Set.of(new TopicPartition("topic", 0))
+ ), 0)),
+ GroupState.STABLE,
+ new Node(0, "host1", 9090), 0, 0);
+ DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group,
KafkaFuture.completedFuture(exp)));
+ when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+
+ Exit.setExitProcedure((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Share group 'share-group' is not
empty."));
+ throw new IllegalArgumentException(message);
+ });
+ assertThrows(IllegalArgumentException.class, () ->
getShareGroupService(cgcArgs, adminClient).resetOffsets());
+ }
+
+ @Test
+ public void testAlterShareGroupOffsetsArgsFailureWithoutResetOffsetsArgs()
{
+ String bootstrapServer = "localhost:9092";
+ String group = "share-group";
+ Admin adminClient = mock(KafkaAdminClient.class);
+ // no reset-offsets spec args
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--execute", "--reset-offsets", "--group", group, "--topic", "topic"};
+ AtomicBoolean exited = new AtomicBoolean(false);
+ Exit.setExitProcedure(((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Option [reset-offsets] takes one of
these options: [to-datetime], [to-earliest], [to-latest]"));
+ exited.set(true);
+ }));
+ try {
+ getShareGroupService(cgcArgs, adminClient);
+ } finally {
+ assertTrue(exited.get());
+ }
+ }
+
+ @Test
+ public void testAlterShareGroupFailureFailureWithNonExistentTopic() {
+ String group = "share-group";
+ String topic = "none";
+ String bootstrapServer = "localhost:9092";
+ String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer,
"--reset-offsets", "--to-earliest", "--execute", "--topic", topic, "--group",
group};
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ group,
+ KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
+ )
+ );
+ ShareGroupDescription exp = new ShareGroupDescription(
+ group,
+ List.of(new ShareMemberDescription("memid1", "clId1", "host1", new
ShareMemberAssignment(
+ Set.of(new TopicPartition(topic, 0))
+ ), 0)),
+ GroupState.EMPTY,
+ new Node(0, "host1", 9090), 0, 0);
+ DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
+
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group,
KafkaFuture.completedFuture(exp)));
+ when(adminClient.describeShareGroups(any(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+ AtomicBoolean exited = new AtomicBoolean(false);
+
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ Map<String, TopicDescription> descriptions = Map.of(
+ topic, new TopicDescription(topic, false, List.of(
+ new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+ )));
+ DescribeTopicsResult describeTopicResult =
mock(DescribeTopicsResult.class);
+
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+ Exit.setExitProcedure(((statusCode, message) -> {
+ assertNotEquals(0, statusCode);
+ assertTrue(message.contains("Share group 'share-group' is not
subscribed to topic 'none'"));
+ exited.set(true);
+ }));
+ try {
+ getShareGroupService(cgcArgs, adminClient).resetOffsets();
+ } finally {
+ assertTrue(exited.get());
+ }
+ }
+
+ private AlterShareGroupOffsetsResult mockAlterShareGroupOffsets(Admin
client, String groupId) {
+ AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mock(AlterShareGroupOffsetsResult.class);
+ KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+ resultFuture.complete(null);
+ when(alterShareGroupOffsetsResult.all()).thenReturn(resultFuture);
+ when(client.alterShareGroupOffsets(eq(groupId),
any())).thenReturn(alterShareGroupOffsetsResult);
+ return alterShareGroupOffsetsResult;
+ }
private void mockListShareGroups(Admin client, LinkedHashMap<String,
GroupState> groupIds) {
ListGroupsResult listResult = mock(ListGroupsResult.class);