apoorvmittal10 commented on code in PR #18671:
URL: https://github.com/apache/kafka/pull/18671#discussion_r1935632909
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10441,6 +10443,125 @@ class KafkaApisTest extends Logging {
})
}
+ @Test
+ def testDescribeShareGroupOffsetsReturnsUnsupportedVersion(): Unit = {
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics(
+ util.List.of(new
DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
+ )
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build())
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ response.data.responses.forEach(topic =>
topic.partitions().forEach(partition =>
assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode())))
+ }
+
+ @Test
+ def testDescribeShareGroupOffsetsRequestsAuthorizationFailed(): Unit = {
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData().setGroupId("group").setTopics(
+ util.List.of(new
DescribeShareGroupOffsetsRequestTopic().setTopicName("topic-1").setPartitions(util.List.of(1)))
+ )
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build())
+
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
+ .thenReturn(Seq(AuthorizationResult.DENIED).asJava)
Review Comment:
Can we avoid conversions?
```suggestion
.thenReturn(util.List.of(AuthorizationResult.ALLOWED))
```
##########
tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java:
##########
@@ -212,17 +216,35 @@ public void testDescribeOffsetsOfAllExistingGroups()
throws Exception {
), 0)),
GroupState.STABLE,
new Node(0, "host1", 9090), 0, 0);
- ListOffsetsResult resultOffsets = new ListOffsetsResult(
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult1 =
AdminClientTestUtils.createListShareGroupOffsetsResult(
Map.of(
- new TopicPartition("topic1", 0),
- KafkaFuture.completedFuture(new
ListOffsetsResult.ListOffsetsResultInfo(0, 0, Optional.empty()))
- ));
+ firstGroup,
+ KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), 0L))
+ )
+ );
+ ListShareGroupOffsetsResult listShareGroupOffsetsResult2 =
AdminClientTestUtils.createListShareGroupOffsetsResult(
+ Map.of(
+ secondGroup,
+ KafkaFuture.completedFuture(Map.of(new
TopicPartition("topic1", 0), 0L))
+ )
+ );
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(firstGroupListing,
secondGroupListing)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup,
KafkaFuture.completedFuture(exp1), secondGroup,
KafkaFuture.completedFuture(exp2)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(resultOffsets);
+
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer(new
Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws
Throwable {
+ Map<String, Object> argument = invocation.getArgument(0);
+ if (argument.containsKey(firstGroup)) {
+ return listShareGroupOffsetsResult1;
+ } else if (argument.containsKey(secondGroup)) {
+ return listShareGroupOffsetsResult2;
+ }
+ return null;
+ }
+ });
Review Comment:
```suggestion
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer(
invocation -> {
Map<String, Object> argument = invocation.getArgument(0);
if (argument.containsKey(firstGroup)) {
return listShareGroupOffsetsResult1;
} else if (argument.containsKey(secondGroup)) {
return listShareGroupOffsetsResult2;
}
return null;
});
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -2409,4 +2191,323 @@ public void
testShareGroupDescribeCoordinatorNotActive() throws ExecutionExcepti
future.get()
);
}
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithNoOpPersister() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new
DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+ .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
+
.setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE)))
+ )
+ );
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersister() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+ DescribeShareGroupOffsetsResponseData responseData = new
DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(21)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(Errors.NONE.message())))
+ )
+ );
+
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setStartOffset(21)
+ .setStateEpoch(1)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(Errors.NONE.message())))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersisterThrowsError()
{
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.failedFuture(new Exception("Unable
to validate read state summary request")));
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(future, Exception.class, "Unable to validate read
state summary request");
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersisterNullResult() {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(future, IllegalStateException.class, "Result is
null for the read state summary");
+ }
+
+ @Test
+ public void
testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
+ new
ReadShareGroupStateSummaryResult.Builder().setTopicsData(null).build();
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+
.thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(future, IllegalStateException.class, "Result is
null for the read state summary");
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new
DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(0)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message())))
+ )
+ );
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsMetadataImageNull() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+
+ // Forcing a null Metadata Image
+ service.onNewMetadataImage(null, null);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new
DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(0)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
+ )
+ );
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @FunctionalInterface
+ interface TriFunction<A, B, C, R> {
Review Comment:
```suggestion
private interface TriFunction<A, B, C, R> {
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsRequest;
+import org.apache.kafka.common.requests.DescribeShareGroupOffsetsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
+import org.apache.kafka.common.utils.LogContext;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is the handler for {@link
KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call
+ */
+public class ListShareGroupOffsetsHandler extends
AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, Long>> {
+
+ private final Map<String, ListShareGroupOffsetsSpec> groupSpecs;
+ private final Logger log;
+ private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+
+ public ListShareGroupOffsetsHandler(
+ Map<String, ListShareGroupOffsetsSpec> groupSpecs,
+ LogContext logContext) {
+ this.groupSpecs = groupSpecs;
+ this.log = logContext.logger(ListShareGroupOffsetsHandler.class);
+ this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP,
logContext);
+ }
+
+ public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey,
Map<TopicPartition, Long>> newFuture(Collection<String> groupIds) {
+ return AdminApiFuture.forKeys(coordinatorKeys(groupIds));
+ }
+
+ @Override
+ public String apiName() {
+ return "describeShareGroupOffsets";
+ }
+
+ @Override
+ public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
+ return lookupStrategy;
+ }
+
+ @Override
+ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int
coordinatorId, Set<CoordinatorKey> keys) {
+ List<String> groupIds = keys.stream().map(key -> {
+ if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
+ throw new IllegalArgumentException("Invalid group coordinator
key " + key +
+ " when building `DescribeShareGroupOffsets` request");
+ }
+ return key.idValue;
+ }).collect(Collectors.toList());
+ // The DescribeShareGroupOffsetsRequest only includes a single group
ID at this point, which is likely a mistake to be fixing a follow-on PR.
+ String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
+ if (groupId == null) {
+ throw new IllegalArgumentException("Missing group id in request");
+ }
+ ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic>
topics =
+ spec.topicPartitions().stream().map(
+ topicPartition -> new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(topicPartition.topic())
+ .setPartitions(List.of(topicPartition.partition()))
+ ).collect(Collectors.toList());
+ DescribeShareGroupOffsetsRequestData data = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(topics);
+ return new DescribeShareGroupOffsetsRequest.Builder(data, true);
+ }
+
+ @Override
+ public ApiResult<CoordinatorKey, Map<TopicPartition, Long>>
handleResponse(Node coordinator,
+
Set<CoordinatorKey> groupIds,
+
AbstractResponse abstractResponse) {
+ final DescribeShareGroupOffsetsResponse response =
(DescribeShareGroupOffsetsResponse) abstractResponse;
+ final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new
HashMap<>();
+ final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+
+ for (CoordinatorKey groupId : groupIds) {
+ Map<TopicPartition, Long> data = new HashMap<>();
+ response.data().responses().stream().map(
+ describedTopic ->
+ describedTopic.partitions().stream().map(
+ partition ->
+ data.put(new
TopicPartition(describedTopic.topicName(), partition.partitionIndex()),
partition.startOffset())
+ ).collect(Collectors.toList())
+ ).collect(Collectors.toList());
+ completed.put(groupId, data);
+ }
+ return new ApiResult<>(completed, failed, new ArrayList<>());
Review Comment:
```suggestion
return new ApiResult<>(completed, failed, Collections.emptyList());
```
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -8970,4 +8975,85 @@ public void testRemoveRaftVoterRequest(boolean fail,
boolean sendClusterId) thro
assertEquals(Uuid.fromString("YAfa4HClT3SIIW2klIUspg"),
requestData.get().voterDirectoryId());
}
}
+
+ @Test
+ public void testListShareGroupOffsetsOptionsWithBatchedApi() {
+ final Cluster cluster = mockCluster(3, 0);
+ final Time time = new MockTime();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster,
+ AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ final List<TopicPartition> partitions =
Collections.singletonList(new TopicPartition("A", 0));
+ final ListShareGroupOffsetsOptions options = new
ListShareGroupOffsetsOptions();
+
+ final ListShareGroupOffsetsSpec groupSpec = new
ListShareGroupOffsetsSpec()
+ .topicPartitions(partitions);
+ Map<String, ListShareGroupOffsetsSpec> groupSpecs = new
HashMap<>();
+ groupSpecs.put(GROUP_ID, groupSpec);
+
+ env.adminClient().listShareGroupOffsets(groupSpecs, options);
+
+ final MockClient mockClient = env.kafkaClient();
+ waitForRequest(mockClient, ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ assertNotNull(clientRequest);
+ DescribeShareGroupOffsetsRequestData data =
((DescribeShareGroupOffsetsRequest.Builder)
clientRequest.requestBuilder()).build().data();
+ assertEquals(GROUP_ID, data.groupId());
+ assertEquals(Collections.singletonList("A"),
+
data.topics().stream().map(DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic::topicName).collect(Collectors.toList()));
+ } catch (Exception e) {
+ fail(e);
+ }
Review Comment:
Why do you need the catch block? If any exception is thrown in the method
then will it not fail the test anyways? Change the method signature to `public
void testListShareGroupOffsetsOptionsWithBatchedApi() throws Exception {`
##########
server-common/src/main/java/org/apache/kafka/server/share/persister/ReadShareGroupStateSummaryParameters.java:
##########
@@ -58,4 +59,16 @@ public ReadShareGroupStateSummaryParameters build() {
return new
ReadShareGroupStateSummaryParameters(groupTopicPartitionData);
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+ ReadShareGroupStateSummaryParameters that =
(ReadShareGroupStateSummaryParameters) o;
+ return Objects.equals(groupTopicPartitionData,
that.groupTopicPartitionData);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(groupTopicPartitionData);
+ }
Review Comment:
Seems you missed it.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -2409,4 +2191,323 @@ public void
testShareGroupDescribeCoordinatorNotActive() throws ExecutionExcepti
future.get()
);
}
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithNoOpPersister() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new
DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+ .setErrorCode(PartitionFactory.DEFAULT_ERROR_CODE)
+
.setErrorMessage(PartitionFactory.DEFAULT_ERR_MESSAGE)))
+ )
+ );
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersister() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryRequestData
readShareGroupStateSummaryRequestData = new
ReadShareGroupStateSummaryRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryRequestData.PartitionData()
+ .setPartition(partition)))));
+
+ DescribeShareGroupOffsetsResponseData responseData = new
DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(21)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(Errors.NONE.message())))
+ )
+ );
+
+ ReadShareGroupStateSummaryResponseData
readShareGroupStateSummaryResponseData = new
ReadShareGroupStateSummaryResponseData()
+ .setResults(
+ List.of(new
ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
ReadShareGroupStateSummaryResponseData.PartitionResult()
+ .setPartition(partition)
+ .setStartOffset(21)
+ .setStateEpoch(1)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(Errors.NONE.message())))
+ )
+ );
+
+ ReadShareGroupStateSummaryParameters
readShareGroupStateSummaryParameters =
ReadShareGroupStateSummaryParameters.from(readShareGroupStateSummaryRequestData);
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
ReadShareGroupStateSummaryResult.from(readShareGroupStateSummaryResponseData);
+ when(persister.readSummary(
+ ArgumentMatchers.eq(readShareGroupStateSummaryParameters)
+
)).thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersisterThrowsError()
{
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.failedFuture(new Exception("Unable
to validate read state summary request")));
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(future, Exception.class, "Unable to validate read
state summary request");
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsWithDefaultPersisterNullResult() {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(future, IllegalStateException.class, "Result is
null for the read state summary");
+ }
+
+ @Test
+ public void
testDescribeShareGroupOffsetsWithDefaultPersisterNullTopicData() {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ ReadShareGroupStateSummaryResult readShareGroupStateSummaryResult =
+ new
ReadShareGroupStateSummaryResult.Builder().setTopicsData(null).build();
+
+ when(persister.readSummary(ArgumentMatchers.any()))
+
.thenReturn(CompletableFuture.completedFuture(readShareGroupStateSummaryResult));
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+ assertFutureThrows(future, IllegalStateException.class, "Result is
null for the read state summary");
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new
DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(0)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+
.setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message())))
+ )
+ );
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDescribeShareGroupOffsetsMetadataImageNull() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build(true);
+
+ // Forcing a null Metadata Image
+ service.onNewMetadataImage(null, null);
+
+ int partition = 1;
+ DescribeShareGroupOffsetsRequestData requestData = new
DescribeShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DescribeShareGroupOffsetsResponseData responseData = new
DescribeShareGroupOffsetsResponseData()
+ .setResponses(
+ List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setStartOffset(0)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
+ )
+ );
+
+ CompletableFuture<DescribeShareGroupOffsetsResponseData> future =
+
service.describeShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @FunctionalInterface
+ interface TriFunction<A, B, C, R> {
+ R apply(A a, B b, C c);
+ }
+
+ public static class GroupCoordinatorServiceBuilder {
Review Comment:
```suggestion
private static class GroupCoordinatorServiceBuilder {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]