This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 270948bf9d7 KAFKA-19115: Utilize initialized topics info to verify
delete share group offsets (#19431)
270948bf9d7 is described below
commit 270948bf9d70a8913792259d7801c33c6a7a3235
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Apr 14 16:40:40 2025 +0530
KAFKA-19115: Utilize initialized topics info to verify delete share group
offsets (#19431)
Currently, in the deleteShareGroupOffsets method in
GroupCoordinatorService, the user request was simply forwarded to the
persister without checking if the requested share partitions were
initialized for the group or not. This PR introduces such a check to
make sure that the persister deleteState request is only called for
share partitions that have been initialized for the group.
Reviewers: Andrew Schofield <[email protected]>, Sushant Mahajan
<[email protected]>
---
.../coordinator/group/GroupCoordinatorService.java | 168 +++----
.../coordinator/group/GroupCoordinatorShard.java | 118 +++++
.../coordinator/group/GroupMetadataManager.java | 49 ++
.../group/GroupCoordinatorServiceTest.java | 515 ++++++++++++++++-----
.../group/GroupCoordinatorShardTest.java | 302 ++++++++++++
.../group/GroupMetadataManagerTest.java | 359 ++++++++++++++
6 files changed, 1290 insertions(+), 221 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index c00502af48f..757bba87263 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -28,7 +28,6 @@ import
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
-import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
@@ -1258,45 +1257,40 @@ public class GroupCoordinatorService implements
GroupCoordinator {
});
}
- private void populateDeleteShareGroupOffsetsFuture(
- DeleteShareGroupOffsetsRequestData requestData,
- CompletableFuture<DeleteShareGroupOffsetsResponseData> future,
- Map<Uuid, String> requestTopicIdToNameMapping,
- List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData,
-
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
deleteShareGroupOffsetsResponseTopicList
-
+ private CompletableFuture<DeleteShareGroupOffsetsResponseData>
persistDeleteShareGroupOffsets(
+ DeleteShareGroupStateParameters deleteStateRequestParameters,
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList
) {
- DeleteShareGroupStateRequestData deleteShareGroupStateRequestData =
new DeleteShareGroupStateRequestData()
- .setGroupId(requestData.groupId())
- .setTopics(deleteShareGroupStateRequestTopicsData);
-
-
persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData))
- .whenComplete((result, error) -> {
- if (error != null) {
- log.error("Failed to delete share group state");
- future.completeExceptionally(error);
- return;
- }
+ return persister.deleteState(deleteStateRequestParameters)
+ .thenCompose(result -> {
if (result == null || result.topicsData() == null) {
log.error("Result is null for the delete share group
state");
- future.completeExceptionally(new
IllegalStateException("Result is null for the delete share group state"));
- return;
+ Exception exception = new IllegalStateException("Result is
null for the delete share group state");
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(exception))
+ );
}
result.topicsData().forEach(topicData ->
- deleteShareGroupOffsetsResponseTopicList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partitionData.partition())
- .setErrorMessage(partitionData.errorCode() ==
Errors.NONE.code() ? null : Errors.forCode(partitionData.errorCode()).message())
- .setErrorCode(partitionData.errorCode())
- ).toList())
- ));
+ errorTopicResponseList.add(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicId(topicData.topicId())
+
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
+ .setPartitions(topicData.partitions().stream().map(
+ partitionData -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+
.setPartitionIndex(partitionData.partition())
+ .setErrorMessage(partitionData.errorCode()
== Errors.NONE.code() ? null :
Errors.forCode(partitionData.errorCode()).message())
+ .setErrorCode(partitionData.errorCode())
+ ).toList())
+ )
+ );
- future.complete(
+ return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
-
.setResponses(deleteShareGroupOffsetsResponseTopicList));
+ .setResponses(errorTopicResponseList)
+ );
+ }).exceptionally(throwable -> {
+ log.error("Failed to delete share group state");
+ return
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
});
}
@@ -1590,83 +1584,53 @@ public class GroupCoordinatorService implements
GroupCoordinator {
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
}
- Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>();
- List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData = new
ArrayList<>(requestData.topics().size());
-
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
deleteShareGroupOffsetsResponseTopicList = new
ArrayList<>(requestData.topics().size());
-
- requestData.topics().forEach(topic -> {
- Uuid topicId =
metadataImage.topics().topicNameToIdView().get(topic.topicName());
- if (topicId != null) {
- requestTopicIdToNameMapping.put(topicId, topic.topicName());
- deleteShareGroupStateRequestTopicsData.add(new
DeleteShareGroupStateRequestData.DeleteStateData()
- .setTopicId(topicId)
- .setPartitions(
- topic.partitions().stream().map(
- partitionIndex -> new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
- ).toList()
- ));
- } else {
- deleteShareGroupOffsetsResponseTopicList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
- .setTopicName(topic.topicName())
- .setPartitions(topic.partitions().stream().map(
- partition -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partition)
-
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
- ).toList()));
- }
- });
-
- // If the request for the persister is empty, just complete the
operation right away.
- if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ if (requestData.topics() == null || requestData.topics().isEmpty()) {
return CompletableFuture.completedFuture(
new DeleteShareGroupOffsetsResponseData()
- .setResponses(deleteShareGroupOffsetsResponseTopicList));
+ );
}
- CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new
CompletableFuture<>();
+ return runtime.scheduleReadOperation(
+ "share-group-delete-offsets-request",
+ topicPartitionFor(groupId),
+ (coordinator, lastCommittedOffset) ->
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData)
+ )
+ .thenCompose(resultHolder -> {
+ if (resultHolder == null) {
+ log.error("Failed to retrieve deleteState request
parameters from group coordinator for the group {}", groupId);
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.UNKNOWN_SERVER_ERROR)
+ );
+ }
+
+ if (resultHolder.topLevelErrorCode() != Errors.NONE.code()) {
+ return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(
+ resultHolder.topLevelErrorCode(),
+ resultHolder.topLevelErrorMessage()
+ )
+ );
+ }
- TopicPartition topicPartition = topicPartitionFor(groupId);
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList =
+ resultHolder.errorTopicResponseList() == null ? new
ArrayList<>() : new ArrayList<>(resultHolder.errorTopicResponseList());
- // This is done to make sure the provided group is empty. Offsets can
be deleted only for an empty share group.
- CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>
describeGroupFuture =
- runtime.scheduleReadOperation(
- "share-group-describe",
- topicPartition,
- (coordinator, lastCommittedOffset) ->
coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset)
- ).exceptionally(exception -> handleOperationException(
- "share-group-describe",
- List.of(groupId),
- exception,
- (error, __) ->
ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error),
- log
- ));
+ if (resultHolder.deleteStateRequestParameters() == null) {
+ return CompletableFuture.completedFuture(
+ new DeleteShareGroupOffsetsResponseData()
+ .setResponses(errorTopicResponseList)
+ );
+ }
- describeGroupFuture.whenComplete((groups, throwable) -> {
- if (throwable != null) {
- log.error("Failed to describe the share group {}", groupId,
throwable);
-
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)));
- } else if (groups == null || groups.isEmpty()) {
- log.error("Describe share group resulted in empty response for
group {}", groupId);
-
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
- } else if (groups.get(0).errorCode() != Errors.NONE.code()) {
- log.error("Failed to describe the share group {}", groupId);
-
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(groups.get(0).errorCode(),
groups.get(0).errorMessage()));
- } else if (groups.get(0).members() != null &&
!groups.get(0).members().isEmpty()) {
- log.error("Provided group {} is not empty", groupId);
-
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.NON_EMPTY_GROUP));
- } else {
- populateDeleteShareGroupOffsetsFuture(
- requestData,
- future,
- requestTopicIdToNameMapping,
- deleteShareGroupStateRequestTopicsData,
- deleteShareGroupOffsetsResponseTopicList
+ return persistDeleteShareGroupOffsets(
+ resultHolder.deleteStateRequestParameters(),
+ errorTopicResponseList
);
- }
- });
-
- return future;
+ })
+ .exceptionally(throwable -> {
+ log.error("Failed to retrieve deleteState request parameters
from group coordinator for the group {}", groupId, throwable);
+ return
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable));
+ });
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 812641c9459..7f9fd576fd5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -26,6 +26,9 @@ import
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -131,6 +134,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -290,6 +294,69 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
}
}
+ public static class DeleteShareGroupOffsetsResultHolder {
+ private final short topLevelErrorCode;
+ private final String topLevelErrorMessage;
+ private final
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList;
+ private final DeleteShareGroupStateParameters
deleteStateRequestParameters;
+
+ DeleteShareGroupOffsetsResultHolder(short topLevelErrorCode, String
topLevelErrorMessage) {
+ this(topLevelErrorCode, topLevelErrorMessage, null, null);
+ }
+
+ DeleteShareGroupOffsetsResultHolder(
+ short topLevelErrorCode,
+ String topLevelErrorMessage,
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList
+ ) {
+ this(topLevelErrorCode, topLevelErrorMessage,
errorTopicResponseList, null);
+ }
+
+ DeleteShareGroupOffsetsResultHolder(
+ short topLevelErrorCode,
+ String topLevelErrorMessage,
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList,
+ DeleteShareGroupStateParameters deleteStateRequestParameters
+ ) {
+ this.topLevelErrorCode = topLevelErrorCode;
+ this.topLevelErrorMessage = topLevelErrorMessage;
+ this.errorTopicResponseList = errorTopicResponseList;
+ this.deleteStateRequestParameters = deleteStateRequestParameters;
+ }
+
+ public short topLevelErrorCode() {
+ return this.topLevelErrorCode;
+ }
+
+ public String topLevelErrorMessage() {
+ return this.topLevelErrorMessage;
+ }
+
+ public
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList() {
+ return this.errorTopicResponseList;
+ }
+
+ public DeleteShareGroupStateParameters deleteStateRequestParameters() {
+ return this.deleteStateRequestParameters;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DeleteShareGroupOffsetsResultHolder other =
(DeleteShareGroupOffsetsResultHolder) o;
+ return topLevelErrorCode == other.topLevelErrorCode &&
+ Objects.equals(topLevelErrorMessage,
other.topLevelErrorMessage) &&
+ Objects.equals(errorTopicResponseList,
other.errorTopicResponseList) &&
+ Objects.equals(deleteStateRequestParameters,
other.deleteStateRequestParameters);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topLevelErrorCode, topLevelErrorMessage,
errorTopicResponseList, deleteStateRequestParameters);
+ }
+ }
+
/**
* The group/offsets expiration key to schedule a timer task.
*
@@ -613,6 +680,57 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return new CoordinatorResult<>(records, responseMap);
}
+ /**
+ * Does the following checks to make sure that a DeleteShareGroupOffsets
request is valid and can be processed further
+ * 1. Checks whether the provided group is empty
+ * 2. Checks the requested topics are presented in the metadataImage
+ * 3. Checks the requested share partitions are initialized for the group
+ *
+ * @param groupId - The group ID
+ * @param requestData - The request data for DeleteShareGroupOffsetsRequest
+ * @return {@link DeleteShareGroupOffsetsResultHolder} an object
containing top level error code, list of topic responses
+ * and persister deleteState
request parameters
+ */
+ public DeleteShareGroupOffsetsResultHolder shareGroupDeleteOffsetsRequest(
+ String groupId,
+ DeleteShareGroupOffsetsRequestData requestData
+ ) {
+ try {
+ ShareGroup group = groupMetadataManager.shareGroup(groupId);
+ group.validateDeleteGroup();
+
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList = new ArrayList<>();
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData =
+ groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(
+ groupId,
+ requestData,
+ errorTopicResponseList
+ );
+
+ if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+ return new
DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(), null,
errorTopicResponseList);
+ }
+
+ DeleteShareGroupStateRequestData deleteShareGroupStateRequestData
= new DeleteShareGroupStateRequestData()
+ .setGroupId(requestData.groupId())
+ .setTopics(deleteShareGroupStateRequestTopicsData);
+
+ return new DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ errorTopicResponseList,
+
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+ );
+
+ } catch (GroupIdNotFoundException exception) {
+ log.error("groupId {} not found", groupId, exception);
+ return new
DeleteShareGroupOffsetsResultHolder(Errors.GROUP_ID_NOT_FOUND.code(),
exception.getMessage());
+ } catch (GroupNotEmptyException exception) {
+ log.error("Provided group {} is not empty", groupId);
+ return new
DeleteShareGroupOffsetsResultHolder(Errors.NON_EMPTY_GROUP.code(),
exception.getMessage());
+ }
+ }
+
/**
* Fetch offsets for a given set of partitions and a given group.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f8a114d999f..2e2b02f9432 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -41,6 +41,9 @@ import
org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -8195,6 +8198,52 @@ public class GroupMetadataManager {
);
}
+ /**
+ * Returns a list of delete share group state request topic objects to be
used with the persister.
+ * @param groupId - group ID of the share group
+ * @param requestData - the request data for DeleteShareGroupOffsets
request
+ * @param errorTopicResponseList - the list of topics not found in the
metadata image
+ * @return List of objects representing the share group state delete
request for topics.
+ */
+ public List<DeleteShareGroupStateRequestData.DeleteStateData>
sharePartitionsEligibleForOffsetDeletion(
+ String groupId,
+ DeleteShareGroupOffsetsRequestData requestData,
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList
+ ) {
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData = new ArrayList<>();
+
+ Map<Uuid, Set<Integer>> initializedSharePartitions =
initializedShareGroupPartitions(groupId);
+ requestData.topics().forEach(topic -> {
+ Uuid topicId =
metadataImage.topics().topicNameToIdView().get(topic.topicName());
+ if (topicId != null) {
+ // A deleteState request to persister should only be sent with
those topic partitions for which corresponding
+ // share partitions are initialized for the group.
+ if (initializedSharePartitions.containsKey(topicId)) {
+ List<DeleteShareGroupStateRequestData.PartitionData>
partitions = new ArrayList<>();
+ topic.partitions().forEach(partition -> {
+ if
(initializedSharePartitions.get(topicId).contains(partition)) {
+ partitions.add(new
DeleteShareGroupStateRequestData.PartitionData().setPartition(partition));
+ }
+ });
+ deleteShareGroupStateRequestTopicsData.add(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId)
+ .setPartitions(partitions));
+ }
+ } else {
+ errorTopicResponseList.add(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topic.topicName())
+ .setPartitions(topic.partitions().stream().map(
+ partition -> new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ).collect(Collectors.toCollection(ArrayList::new))));
+ }
+ });
+
+ return deleteShareGroupStateRequestTopicsData;
+ }
+
/**
* Validates the DeleteGroups request.
*
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 6b117a0ec45..01e74ea558a 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -121,7 +121,6 @@ import org.mockito.ArgumentMatchers;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -3298,9 +3297,11 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
@@ -3317,14 +3318,30 @@ public class GroupCoordinatorServiceTest {
.setErrorMessage(null))))
);
- ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
- .setGroupId("share-group-id-1");
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ List.of(),
+ DeleteShareGroupStateParameters.from(
+ new DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ ))
+ )
+ );
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3343,16 +3360,18 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupStateRequestData deleteShareGroupStateRequestData =
new DeleteShareGroupStateRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(TOPIC_ID)
.setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
@@ -3380,14 +3399,19 @@ public class GroupCoordinatorServiceTest {
)
);
- ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
- .setGroupId("share-group-id-1");
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ List.of(),
+
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+ );
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
DeleteShareGroupStateParameters deleteShareGroupStateParameters =
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
DeleteShareGroupStateResult deleteShareGroupStateResult =
DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
@@ -3402,33 +3426,53 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void
testDeleteShareGroupOffsetsNonexistentTopicWithDefaultPersister() throws
InterruptedException, ExecutionException {
+ public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .build();
+
+ int partition = 1;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id")
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition))
+ ));
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
+
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsMetadataImageNull() throws
ExecutionException, InterruptedException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
- Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
- .setPersister(persister)
.build(true);
- service.startup(() -> 1);
+
+ // Forcing a null Metadata Image
+ service.onNewMetadataImage(null, null);
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
- .setTopicName("badtopic")
+ .setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setResponses(
- List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
- .setTopicName("badtopic")
- .setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partition)
- .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
-
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))
- );
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3437,7 +3481,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError() {
+ public void testDeleteShareGroupOffsetsInvalidGroupId() throws
InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3449,31 +3493,46 @@ public class GroupCoordinatorServiceTest {
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId("")
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
- ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
- .setGroupId("share-group-id-1");
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ .setErrorMessage(Errors.INVALID_GROUP_ID.message());
- when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
- ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
- ArgumentMatchers.any()
-
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+ CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
+
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
- when(persister.deleteState(ArgumentMatchers.any()))
- .thenReturn(CompletableFuture.failedFuture(new Exception("Unable
to validate delete share group state request")));
+ assertEquals(responseData, future.get());
+ }
+
+ @Test
+ public void testDeleteShareGroupOffsetsEmptyRequest() throws
InterruptedException, ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId("share-group-id");
+
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData();
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
- assertFutureThrows(Exception.class, future, "Unable to validate delete
share group state request");
+
+ assertEquals(responseData, future.get());
}
@Test
- public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult() {
+ public void testDeleteShareGroupOffsetsRequestThrowsError() throws
InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3483,33 +3542,34 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
- ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
- .setGroupId("share-group-id-1");
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
-
- when(persister.deleteState(ArgumentMatchers.any()))
- .thenReturn(CompletableFuture.completedFuture(null));
+
)).thenReturn(CompletableFuture.completedFuture(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception())));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
- assertFutureThrows(IllegalStateException.class, future, "Result is
null for the delete share group state");
+
+ assertEquals(responseData, future.get());
}
@Test
- public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData()
{
+ public void testDeleteShareGroupOffsetsRequestReturnsNull() throws
InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3519,53 +3579,70 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
- ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
- .setGroupId("share-group-id-1");
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
-
- DeleteShareGroupStateResult deleteShareGroupStateResult =
- new
DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
-
- when(persister.deleteState(ArgumentMatchers.any()))
-
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
+ )).thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
- assertFutureThrows(IllegalStateException.class, future, "Result is
null for the delete share group state");
+
+ assertEquals(responseData, future.get());
}
@Test
- public void testDeleteShareGroupOffsetsCoordinatorNotActive() throws
ExecutionException, InterruptedException {
+ public void testDeleteShareGroupOffsetsRequestReturnsGroupIdNotFound()
throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
- .build();
+ .setPersister(persister)
+ .build(true);
+ service.startup(() -> 1);
+
+ String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
- .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.GROUP_ID_NOT_FOUND.code(),
+ Errors.GROUP_ID_NOT_FOUND.message(),
+ null,
+ null
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3574,27 +3651,43 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsMetadataImageNull() throws
ExecutionException, InterruptedException {
+ public void testDeleteShareGroupOffsetsRequestReturnsGroupNotEmpty()
throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(runtime)
+ .setPersister(persister)
.build(true);
+ service.startup(() -> 1);
- // Forcing a null Metadata Image
- service.onNewMetadataImage(null, null);
+ String groupId = "share-group-id";
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
- .setErrorMessage(Errors.COORDINATOR_NOT_AVAILABLE.message());
+ .setErrorCode(Errors.NON_EMPTY_GROUP.code())
+ .setErrorMessage(Errors.NON_EMPTY_GROUP.message());
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NON_EMPTY_GROUP.code(),
+ Errors.NON_EMPTY_GROUP.message(),
+ null,
+ null
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3603,7 +3696,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsInvalidGroupId() throws
InterruptedException, ExecutionException {
+ public void testDeleteShareGroupOffsetsRequestReturnsNullParameters()
throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3613,17 +3706,31 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
- DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setErrorCode(Errors.INVALID_GROUP_ID.code())
- .setErrorMessage(Errors.INVALID_GROUP_ID.message());
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData();
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ null,
+ null
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3632,7 +3739,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsDescribeThrowsError() throws
InterruptedException, ExecutionException {
+ public void
testDeleteShareGroupOffsetsRequestReturnsNullParametersWithErrorTopics() throws
InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3642,23 +3749,52 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String badTopicName = "bad-topic";
+ Uuid badTopicId = Uuid.randomUuid();
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
- .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
- .setTopicName(TOPIC_NAME)
- .setPartitions(List.of(partition))
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(badTopicName)
+ .setPartitions(List.of(partition))
));
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
- .setErrorMessage(Errors.UNKNOWN_SERVER_ERROR.message());
+ .setResponses(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(badTopicName)
+ .setTopicId(badTopicId)
+ .setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ))));
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(badTopicName)
+ .setTopicId(badTopicId)
+ .setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ))),
+ null
+ );
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(FutureUtils.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3667,7 +3803,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsDescribeReturnsNull() throws
InterruptedException, ExecutionException {
+ public void testDeleteShareGroupOffsetsWithDefaultPersisterThrowsError()
throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3677,23 +3813,49 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
+ Exception persisterException = new Exception("Unable to validate
delete share group state request");
+
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
- .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+ .setErrorCode(Errors.forException(persisterException).code())
+
.setErrorMessage(Errors.forException(persisterException).message());
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ List.of(),
+ DeleteShareGroupStateParameters.from(
+ new DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ ))
+ )
+ );
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
- )).thenReturn(CompletableFuture.completedFuture(null));
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
+ when(persister.deleteState(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.failedFuture(persisterException));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3702,7 +3864,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsDescribeReturnsEmpty() throws
InterruptedException, ExecutionException {
+ public void testDeleteShareGroupOffsetsWithDefaultPersisterNullResult()
throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3712,23 +3874,49 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
+ Exception persisterException = new IllegalStateException("Result is
null for the delete share group state");
+
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
- .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+ .setErrorCode(Errors.forException(persisterException).code())
+
.setErrorMessage(Errors.forException(persisterException).message());
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ List.of(),
+ DeleteShareGroupStateParameters.from(
+ new DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ ))
+ )
+ );
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
+ when(persister.deleteState(ArgumentMatchers.any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3737,7 +3925,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsDescribeReturnsError() throws
InterruptedException, ExecutionException {
+ public void testDeleteShareGroupOffsetsWithDefaultPersisterNullTopicData()
throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3747,27 +3935,52 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String groupId = "share-group-id";
+
int partition = 1;
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
+ .setGroupId(groupId)
.setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
.setTopicName(TOPIC_NAME)
.setPartitions(List.of(partition))
));
- DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
- .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+ Exception persisterException = new IllegalStateException("Result is
null for the delete share group state");
- ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
- .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
- .setErrorMessage(Errors.GROUP_ID_NOT_FOUND.message());
+ DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
+ .setErrorCode(Errors.forException(persisterException).code())
+
.setErrorMessage(Errors.forException(persisterException).message());
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ List.of(),
+ DeleteShareGroupStateParameters.from(
+ new DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(
+ new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ ))
+ )
+ );
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
+ DeleteShareGroupStateResult deleteShareGroupStateResult =
+ new
DeleteShareGroupStateResult.Builder().setTopicsData(null).build();
+
+ when(persister.deleteState(ArgumentMatchers.any()))
+
.thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
@@ -3776,7 +3989,7 @@ public class GroupCoordinatorServiceTest {
}
@Test
- public void testDeleteShareGroupOffsetsGroupIsNotEmpty() throws
InterruptedException, ExecutionException {
+ public void testDeleteShareGroupOffsetsSuccessWithErrorTopicPartitions()
throws InterruptedException, ExecutionException {
CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
Persister persister = mock(DefaultStatePersister.class);
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
@@ -3786,27 +3999,91 @@ public class GroupCoordinatorServiceTest {
.build(true);
service.startup(() -> 1);
+ String badTopicName = "bad-topic";
+ Uuid badTopicId = Uuid.randomUuid();
+ String groupId = "share-group-id";
+
int partition = 1;
+
DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
- .setGroupId("share-group-id")
- .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
- .setTopicName(TOPIC_NAME)
- .setPartitions(List.of(partition))
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(TOPIC_NAME)
+ .setPartitions(List.of(partition)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(badTopicName)
+ .setPartitions(List.of(partition))
));
+ DeleteShareGroupStateRequestData deleteShareGroupStateRequestData =
new DeleteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(new
DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)))));
+
DeleteShareGroupOffsetsResponseData responseData = new
DeleteShareGroupOffsetsResponseData()
- .setErrorCode(Errors.NON_EMPTY_GROUP.code())
- .setErrorMessage(Errors.NON_EMPTY_GROUP.message());
+ .setResponses(
+ List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(badTopicName)
+ .setTopicId(badTopicId)
+ .setPartitions(List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ )),
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(TOPIC_NAME)
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null)
+ ))
+ )
+ );
- ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
- .setGroupId("share-group-id-1")
- .setMembers(List.of(new ShareGroupDescribeResponseData.Member()));
+ DeleteShareGroupStateResponseData deleteShareGroupStateResponseData =
new DeleteShareGroupStateResponseData()
+ .setResults(
+ List.of(new
DeleteShareGroupStateResponseData.DeleteStateResult()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
DeleteShareGroupStateResponseData.PartitionResult()
+ .setPartition(partition)
+ .setErrorCode(Errors.NONE.code())
+ .setErrorMessage(null)))
+ )
+ );
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
deleteShareGroupOffsetsResultHolder =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(badTopicName)
+ .setTopicId(badTopicId)
+ .setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ))),
+
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)
+ );
when(runtime.scheduleReadOperation(
- ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq("share-group-delete-offsets-request"),
ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
ArgumentMatchers.any()
-
)).thenReturn(CompletableFuture.completedFuture(List.of(describedGroup)));
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupOffsetsResultHolder));
+
+ DeleteShareGroupStateParameters deleteShareGroupStateParameters =
DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData);
+ DeleteShareGroupStateResult deleteShareGroupStateResult =
DeleteShareGroupStateResult.from(deleteShareGroupStateResponseData);
+ when(persister.deleteState(
+ ArgumentMatchers.eq(deleteShareGroupStateParameters)
+
)).thenReturn(CompletableFuture.completedFuture(deleteShareGroupStateResult));
CompletableFuture<DeleteShareGroupOffsetsResponseData> future =
service.deleteShareGroupOffsets(requestContext(ApiKeys.DELETE_SHARE_GROUP_OFFSETS),
requestData);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 16665f98bf2..25af1ca34a6 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -23,6 +23,9 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
@@ -124,6 +127,7 @@ import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -1949,4 +1953,302 @@ public class GroupCoordinatorShardTest {
verify(groupMetadataManager, times(0)).group(eq("share-group"));
verify(groupMetadataManager,
times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
}
+
+ @Test
+ public void testShareGroupDeleteOffsetsRequestGroupNotFound() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ String groupId = "share-group";
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName("topic-1")
+ .setPartitions(List.of(0))
+ ));
+
+ GroupIdNotFoundException exception = new
GroupIdNotFoundException("group Id not found");
+
+ doThrow(exception).when(groupMetadataManager).shareGroup(eq(groupId));
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
expectedResult =
+ new
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(),
exception.getMessage());
+
+ assertEquals(expectedResult,
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+ verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+ // Not called because of Group not found.
+ verify(groupMetadataManager,
times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+ }
+
+ @Test
+ public void testShareGroupDeleteOffsetsRequestNonEmptyShareGroup() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ String groupId = "share-group";
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName("topic-1")
+ .setPartitions(List.of(0))
+ ));
+
+ ShareGroup shareGroup = mock(ShareGroup.class);
+ GroupNotEmptyException exception = new GroupNotEmptyException("group
is not empty");
+ doThrow(exception).when(shareGroup).validateDeleteGroup();
+
+
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
expectedResult =
+ new
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.forException(exception).code(),
exception.getMessage());
+
+ assertEquals(expectedResult,
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+ verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+ // Not called because of Group not found.
+ verify(groupMetadataManager,
times(0)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+ }
+
+ @Test
+ public void testShareGroupDeleteOffsetsRequestEmptyResult() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ String groupId = "share-group";
+ String topicName = "topic-1";
+ Uuid topicId = Uuid.randomUuid();
+ int partition = 0;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName)
+ .setPartitions(List.of(partition))
+ ));
+
+ ShareGroup shareGroup = mock(ShareGroup.class);
+ doNothing().when(shareGroup).validateDeleteGroup();
+
+
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
+
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList = List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName)
+ .setTopicId(topicId)
+ .setPartitions(List.of(new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+ .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())))
+ );
+
+
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId),
eq(requestData), any()))
+ .thenAnswer(invocation -> {
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
inputList = invocation.getArgument(2);
+ inputList.addAll(errorTopicResponseList);
+ return List.of();
+ });
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
expectedResult =
+ new
GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(Errors.NONE.code(),
null, errorTopicResponseList);
+
+ assertEquals(expectedResult,
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+ verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+ verify(groupMetadataManager,
times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+ }
+
+ @Test
+ public void testShareGroupDeleteOffsetsRequestSuccess() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ String groupId = "share-group";
+ String topicName1 = "topic-1";
+ Uuid topicId1 = Uuid.randomUuid();
+ String topicName2 = "topic-2";
+ Uuid topicId2 = Uuid.randomUuid();
+ int partition = 0;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(List.of(partition)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(List.of(partition))
+ ));
+
+ ShareGroup shareGroup = mock(ShareGroup.class);
+ doNothing().when(shareGroup).validateDeleteGroup();
+
+
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData =
+ List.of(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ )),
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ );
+
+
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId),
eq(requestData), any()))
+ .thenReturn(deleteShareGroupStateRequestTopicsData);
+
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
expectedResult =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ List.of(),
+ DeleteShareGroupStateParameters.from(
+ new DeleteShareGroupStateRequestData()
+ .setGroupId(requestData.groupId())
+ .setTopics(deleteShareGroupStateRequestTopicsData)
+ ));
+
+ assertEquals(expectedResult,
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+ verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+ verify(groupMetadataManager,
times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+ }
+
+ @Test
+ public void testShareGroupDeleteOffsetsRequestSuccessWithErrorTopics() {
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ OffsetMetadataManager offsetMetadataManager =
mock(OffsetMetadataManager.class);
+ CoordinatorMetrics coordinatorMetrics = mock(CoordinatorMetrics.class);
+ CoordinatorMetricsShard metricsShard =
mock(CoordinatorMetricsShard.class);
+ GroupCoordinatorShard coordinator = new GroupCoordinatorShard(
+ new LogContext(),
+ groupMetadataManager,
+ offsetMetadataManager,
+ Time.SYSTEM,
+ new MockCoordinatorTimer<>(Time.SYSTEM),
+ mock(GroupCoordinatorConfig.class),
+ coordinatorMetrics,
+ metricsShard
+ );
+
+ String groupId = "share-group";
+ String topicName1 = "topic-1";
+ Uuid topicId1 = Uuid.randomUuid();
+ String topicName2 = "topic-2";
+ Uuid topicId2 = Uuid.randomUuid();
+ int partition = 0;
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(List.of(partition)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(List.of(partition))
+ ));
+
+ ShareGroup shareGroup = mock(ShareGroup.class);
+ doNothing().when(shareGroup).validateDeleteGroup();
+
+
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData>
deleteShareGroupStateRequestTopicsData =
+ List.of(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(partition)
+ ))
+ );
+
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList =
+ List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName2)
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partition)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ))
+ );
+
+
when(groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(eq(groupId),
eq(requestData), any()))
+ .thenAnswer(invocation -> {
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
inputList = invocation.getArgument(2);
+
+ inputList.addAll(errorTopicResponseList);
+ return deleteShareGroupStateRequestTopicsData;
+ });
+
+
+ GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder
expectedResult =
+ new GroupCoordinatorShard.DeleteShareGroupOffsetsResultHolder(
+ Errors.NONE.code(),
+ null,
+ errorTopicResponseList,
+ DeleteShareGroupStateParameters.from(
+ new DeleteShareGroupStateRequestData()
+ .setGroupId(requestData.groupId())
+ .setTopics(deleteShareGroupStateRequestTopicsData)
+ ));
+
+ assertEquals(expectedResult,
coordinator.shareGroupDeleteOffsetsRequest(groupId, requestData));
+ verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
+ verify(groupMetadataManager,
times(1)).sharePartitionsEligibleForOffsetDeletion(any(), any(), any());
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 35574d35ca3..4665e81e9be 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -43,6 +43,9 @@ import
org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
+import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
+import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -20804,6 +20807,362 @@ public class GroupMetadataManagerTest {
assertRecordsEquals(expectedRecords, records);
}
+ @Test
+ public void testSharePartitionsEligibleForOffsetDeletionSuccess() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .build();
+
+ String groupId = "share-group";
+ Uuid memberId = Uuid.randomUuid();
+ String topicName1 = "topic-1";
+ String topicName2 = "topic-2";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(topicId1, topicName1, 3)
+ .addTopic(topicId2, topicName2, 2)
+ .build();
+
+ context.groupMetadataManager.onNewMetadataImage(image,
mock(MetadataDelta.class));
+
+ context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId.toString())
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of(topicName1, topicName2)));
+
+ context.groupMetadataManager.replay(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializingTopics(List.of())
+ .setInitializedTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(topicId1)
+ .setTopicName(topicName1)
+ .setPartitions(List.of(0, 1, 2)),
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(topicId2)
+ .setTopicName(topicName2)
+ .setPartitions(List.of(0, 1))
+ ))
+ .setDeletingTopics(List.of())
+ );
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult
= List.of(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(0),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(1),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(2)
+ )),
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId2)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(0),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(1)
+ ))
+ );
+
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(List.of(0, 1, 2)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(List.of(0, 1))
+ ));
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList = new ArrayList<>();
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData> result =
+
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId,
requestData, errorTopicResponseList);
+
+ assertTrue(errorTopicResponseList.isEmpty());
+ assertEquals(expectedResult, result);
+ }
+
+ @Test
+ public void testSharePartitionsEligibleForOffsetDeletionErrorTopics() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .build();
+
+ String groupId = "share-group";
+ Uuid memberId = Uuid.randomUuid();
+ String topicName1 = "topic-1";
+ String topicName2 = "topic-2";
+ Uuid topicId1 = Uuid.randomUuid();
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(topicId1, topicName1, 3)
+ .build();
+
+ context.groupMetadataManager.onNewMetadataImage(image,
mock(MetadataDelta.class));
+
+ context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId.toString())
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of(topicName1)));
+
+ context.groupMetadataManager.replay(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializingTopics(List.of())
+ .setInitializedTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(topicId1)
+ .setTopicName(topicName1)
+ .setPartitions(List.of(0, 1, 2))
+ ))
+ .setDeletingTopics(List.of())
+ );
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult
= List.of(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(0),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(1),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(2)
+ ))
+ );
+
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(List.of(0, 1, 2)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(List.of(0, 1))
+ ));
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList = new ArrayList<>();
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData> result =
+
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId,
requestData, errorTopicResponseList);
+
+ assertEquals(
+ List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName2)
+ .setPartitions(List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ))
+ ),
+ errorTopicResponseList
+ );
+ assertEquals(expectedResult, result);
+ }
+
+ @Test
+ public void
testSharePartitionsEligibleForOffsetDeletionUninitializedTopics() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .build();
+
+ String groupId = "share-group";
+ Uuid memberId = Uuid.randomUuid();
+ String topicName1 = "topic-1";
+ String topicName2 = "topic-2";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(topicId1, topicName1, 3)
+ .addTopic(topicId2, topicName2, 2)
+ .build();
+
+ context.groupMetadataManager.onNewMetadataImage(image,
mock(MetadataDelta.class));
+
+ context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId.toString())
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of(topicName1, topicName2)));
+
+ context.groupMetadataManager.replay(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializedTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(topicId1)
+ .setTopicName(topicName1)
+ .setPartitions(List.of(0, 1, 2))
+ ))
+ .setInitializingTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(topicId2)
+ .setTopicName(topicName2)
+ .setPartitions(List.of(0, 1))
+ ))
+ .setDeletingTopics(List.of())
+ );
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult
= List.of(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(0),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(1),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(2)
+ ))
+ );
+
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(List.of(0, 1, 2)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(List.of(0, 1))
+ ));
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList = new ArrayList<>();
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData> result =
+
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId,
requestData, errorTopicResponseList);
+
+ assertTrue(errorTopicResponseList.isEmpty());
+ assertEquals(expectedResult, result);
+ }
+
+ @Test
+ public void
testSharePartitionsEligibleForOffsetDeletionUninitializedAndErrorTopics() {
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .build();
+
+ String groupId = "share-group";
+ Uuid memberId = Uuid.randomUuid();
+ String topicName1 = "topic-1";
+ String topicName2 = "topic-2";
+ String topicName3 = "topic-3";
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+
+ MetadataImage image = new MetadataImageBuilder()
+ .addTopic(topicId1, topicName1, 3)
+ .addTopic(topicId2, topicName2, 2)
+ .build();
+
+ context.groupMetadataManager.onNewMetadataImage(image,
mock(MetadataDelta.class));
+
+ context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId.toString())
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(List.of(topicName1, topicName2)));
+
+ context.groupMetadataManager.replay(
+ new ShareGroupStatePartitionMetadataKey()
+ .setGroupId(groupId),
+ new ShareGroupStatePartitionMetadataValue()
+ .setInitializedTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(topicId1)
+ .setTopicName(topicName1)
+ .setPartitions(List.of(0, 1, 2))
+ ))
+ .setInitializingTopics(List.of(
+ new
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+ .setTopicId(topicId2)
+ .setTopicName(topicName2)
+ .setPartitions(List.of(0, 1))
+ ))
+ .setDeletingTopics(List.of())
+ );
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData> expectedResult
= List.of(
+ new DeleteShareGroupStateRequestData.DeleteStateData()
+ .setTopicId(topicId1)
+ .setPartitions(List.of(
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(0),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(1),
+ new DeleteShareGroupStateRequestData.PartitionData()
+ .setPartition(2)
+ ))
+ );
+
+ DeleteShareGroupOffsetsRequestData requestData = new
DeleteShareGroupOffsetsRequestData()
+ .setGroupId(groupId)
+ .setTopics(List.of(
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName1)
+ .setPartitions(List.of(0, 1, 2)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName2)
+ .setPartitions(List.of(0, 1)),
+ new
DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
+ .setTopicName(topicName3)
+ .setPartitions(List.of(0, 1))
+ ));
+
List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic>
errorTopicResponseList = new ArrayList<>();
+
+ List<DeleteShareGroupStateRequestData.DeleteStateData> result =
+
context.groupMetadataManager.sharePartitionsEligibleForOffsetDeletion(groupId,
requestData, errorTopicResponseList);
+
+ assertEquals(
+ List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName3)
+ .setPartitions(List.of(
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()),
+ new
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(1)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+ ))
+ ),
+ errorTopicResponseList
+ );
+ assertEquals(expectedResult, result);
+ }
+
@Test
public void testShareGroupHeartbeatInitializeOnPartitionUpdate() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");