dajac commented on code in PR #18848:
URL: https://github.com/apache/kafka/pull/18848#discussion_r1963519484
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6855,6 +6861,45 @@ public void createGroupTombstoneRecords(
group.createGroupTombstoneRecords(records);
}
+ /**
+ * Returns an optional of delete share group request object to be used
with the persister.
+ * Empty if no subscribed topics or if the share group is empty.
+ * @param shareGroup - A share group
+ * @return Optional of object representing the share group state delete
request.
+ */
+ public Optional<DeleteShareGroupStateParameters>
shareGroupBuildPartitionDeleteRequest(ShareGroup shareGroup) {
+ TopicsImage topicsImage = metadataImage.topics();
+ Set<String> subscribedTopics =
shareGroup.subscribedTopicNames().keySet();
Review Comment:
For my understanding, this set is always empty given that we require the
group to be empty, right? This will be addressed in a follow-up PR if I
understood correctly.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
assertEquals(expectedResultCollection, future.get());
}
+ @Test
+ public void testDeleteWithShareGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // non-share group
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ // null
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(List.of(
+ result3.duplicate(),
+ result2.duplicate(),
+ result1.duplicate()
+ ));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(Map.of())); //
non-share group
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = Arrays.asList("share-group-id-1",
"group-id-2", null);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
Review Comment:
nit: This is not necessary because the exception will be caught anyway by
the test running.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
assertEquals(expectedResultCollection, future.get());
}
+ @Test
+ public void testDeleteWithShareGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // non-share group
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ // null
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(List.of(
+ result3.duplicate(),
+ result2.duplicate(),
+ result1.duplicate()
+ ));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(Map.of())); //
non-share group
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = Arrays.asList("share-group-id-1",
"group-id-2", null);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(1)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupPersisterError() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group success
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ result1.duplicate(),
+ result2.duplicate()));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+ Uuid shareGroupTopicId2 = Uuid.randomUuid();
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-2",
Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2,
List.of(0, 1)), Errors.NONE))
+ ));
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
+ ))
+ ))
+ .build()
+ )).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId2,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // share-group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = List.of("share-group-id-1",
"share-group-id-2");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(2)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupCoordinatorShareSpecificWriteError()
throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(
+ result1.duplicate()
+ );
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.failedFuture(
+ Errors.COORDINATOR_NOT_AVAILABLE.exception()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ List<String> groupIds = List.of("share-group-id-1");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(0)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupNotEmptyError() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.forException(new GroupNotEmptyException("bad
stuff")).code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(
+ result1.duplicate()
+ );
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1", Map.entry(EMPTY_PARAMS,
Errors.forException(new GroupNotEmptyException("bad stuff"))))
+ ));
+
+ List<String> groupIds = List.of("share-group-id-1");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -846,6 +861,152 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ List<String> deleteCandidateGroupIds(
Review Comment:
Should it be private? If we use it from tests, let's add `// package-private
for testing` please.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
assertEquals(expectedResultCollection, future.get());
}
+ @Test
+ public void testDeleteWithShareGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // non-share group
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ // null
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(List.of(
+ result3.duplicate(),
+ result2.duplicate(),
+ result1.duplicate()
+ ));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(Map.of())); //
non-share group
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = Arrays.asList("share-group-id-1",
"group-id-2", null);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(1)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupPersisterError() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group success
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ result1.duplicate(),
+ result2.duplicate()));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+ Uuid shareGroupTopicId2 = Uuid.randomUuid();
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-2",
Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2,
List.of(0, 1)), Errors.NONE))
+ ));
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
+ ))
+ ))
+ .build()
+ )).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId2,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // share-group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = List.of("share-group-id-1",
"share-group-id-2");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(2)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupCoordinatorShareSpecificWriteError()
throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(
+ result1.duplicate()
+ );
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.failedFuture(
+ Errors.COORDINATOR_NOT_AVAILABLE.exception()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ List<String> groupIds = List.of("share-group-id-1");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(0)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupNotEmptyError() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.forException(new GroupNotEmptyException("bad
stuff")).code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(
+ result1.duplicate()
+ );
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1", Map.entry(EMPTY_PARAMS,
Errors.forException(new GroupNotEmptyException("bad stuff"))))
+ ));
+
+ List<String> groupIds = List.of("share-group-id-1");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ // If there is error creating share group delete req
+ // neither persister call nor general delete groups call is made.
+ verify(persister, times(0)).deleteState(any());
+ verify(runtime, times(0)).scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ );
+ }
+
+ @Test
+ public void testDeleteShareGroupCoordinatorGeneralWriteError() throws
Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(
+ result1.duplicate()
+ );
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.failedFuture(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()));
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(new
DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ List<String> groupIds = List.of("share-group-id-1");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
Review Comment:
ditto.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -846,6 +861,152 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
(accumulator, newResults) -> newResults.forEach(result ->
accumulator.add(result.duplicate())));
}
+ List<String> deleteCandidateGroupIds(
+ Map<String, Errors> groupErrMap,
+ List<String> groupList,
+ DeleteGroupsResponseData.DeletableGroupResultCollection collection
+ ) {
+ List<String> errGroupIds = new ArrayList<>();
+ groupErrMap.forEach((groupId, error) -> {
+ if (error.code() != Errors.NONE.code()) {
+ log.error("Error deleting share group {} due to error {}",
groupId, error);
+ errGroupIds.add(groupId);
+ collection.add(
+ new DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ );
+ }
+ });
+
+ Set<String> groupSet = new HashSet<>(groupList);
+ // Remove all share group ids which have errored out
+ // when deleting with persister.
+ groupSet.removeAll(errGroupIds);
+
+ // Let us invoke the standard procedure of any non-share
+ // groups or successfully deleted share groups remaining.
+ return groupSet.stream().toList();
+ }
+
+ CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
handleDeleteGroups(
Review Comment:
ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -2532,4 +2927,17 @@ public GroupCoordinatorServiceBuilder
setMetrics(GroupCoordinatorMetrics metrics
return this;
}
}
+
+ private DeleteShareGroupStateParameters createDeleteShareRequest(String
groupId, Uuid topic, List<Integer> partitions) {
Review Comment:
nit: Should we make it static?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -823,20 +831,27 @@ public
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
});
groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
- runtime.scheduleWriteOperation(
- "delete-groups",
- topicPartition,
- Duration.ofMillis(config.offsetCommitTimeoutMs()),
- coordinator -> coordinator.deleteGroups(context, groupList)
- ).exceptionally(exception -> handleOperationException(
- "delete-groups",
- groupList,
- exception,
- (error, __) ->
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
- log
- ));
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap
-> {
+ DeleteGroupsResponseData.DeletableGroupResultCollection
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ List<String> retainedGroupIds =
deleteCandidateGroupIds(groupErrMap, groupList, collection);
+ if (retainedGroupIds.isEmpty()) {
+ return CompletableFuture.completedFuture(collection);
+ }
+ return handleDeleteGroups(context, topicPartition,
retainedGroupIds)
+ .whenComplete((resp, __) -> resp.forEach(result ->
collection.add(result.duplicate())))
+ .thenApply(__ -> collection);
+ });
+ // deleteShareGroups has its own exceptionally block, so we don't
need one here.
+
+ // This future object has the following stages:
+ // - First it invokes the share group delete flow where the shard
sharePartitionDeleteRequests
+ // method is invoked, and it returns request objects for each
valid share group passed to it.
+ // - Then the requests are passed to the persister.deleteStata
method one at a time. The results
Review Comment:
nit: `deleteStata` -> `deleteState`?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
assertEquals(expectedResultCollection, future.get());
}
+ @Test
+ public void testDeleteWithShareGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // non-share group
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ // null
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(List.of(
+ result3.duplicate(),
+ result2.duplicate(),
+ result1.duplicate()
+ ));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(Map.of())); //
non-share group
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = Arrays.asList("share-group-id-1",
"group-id-2", null);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(1)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupPersisterError() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group success
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ result1.duplicate(),
+ result2.duplicate()));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+ Uuid shareGroupTopicId2 = Uuid.randomUuid();
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-2",
Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2,
List.of(0, 1)), Errors.NONE))
+ ));
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
+ ))
+ ))
+ .build()
+ )).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId2,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // share-group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = List.of("share-group-id-1",
"share-group-id-2");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(2)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupCoordinatorShareSpecificWriteError()
throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.add(
+ result1.duplicate()
+ );
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.failedFuture(
+ Errors.COORDINATOR_NOT_AVAILABLE.exception()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ List<String> groupIds = List.of("share-group-id-1");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
Review Comment:
ditto.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
assertEquals(expectedResultCollection, future.get());
}
+ @Test
+ public void testDeleteWithShareGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1");
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // non-share group
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("group-id-2");
+ resultCollection2.add(result2);
+
+ // null
+ DeleteGroupsResponseData.DeletableGroupResult result3 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(List.of(
+ result3.duplicate(),
+ result2.duplicate(),
+ result1.duplicate()
+ ));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(Map.of())); //
non-share group
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = Arrays.asList("share-group-id-1",
"group-id-2", null);
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
+ assertEquals(expectedResultCollection, future.get());
+ verify(persister, times(1)).deleteState(any());
+ }
+
+ @Test
+ public void testDeleteShareGroupPersisterError() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ Persister persister = mock(Persister.class);
+ GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+ .setConfig(createConfig())
+ .setRuntime(runtime)
+ .setMetrics(mock(GroupCoordinatorMetrics.class))
+ .setPersister(persister)
+ .build();
+ service.startup(() -> 3);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection1 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group err
+ DeleteGroupsResponseData.DeletableGroupResult result1 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-1")
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+ resultCollection1.add(result1);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
resultCollection2 =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ // share group success
+ DeleteGroupsResponseData.DeletableGroupResult result2 = new
DeleteGroupsResponseData.DeletableGroupResult()
+ .setGroupId("share-group-id-2");
+ resultCollection2.add(result2);
+
+ DeleteGroupsResponseData.DeletableGroupResultCollection
expectedResultCollection =
+ new DeleteGroupsResponseData.DeletableGroupResultCollection();
+ expectedResultCollection.addAll(Arrays.asList(
+ result1.duplicate(),
+ result2.duplicate()));
+
+ Uuid shareGroupTopicId = Uuid.randomUuid();
+ Uuid shareGroupTopicId2 = Uuid.randomUuid();
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-share-groups"),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-1",
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId,
List.of(0, 1)), Errors.NONE))
+ )).thenReturn(CompletableFuture.completedFuture(
+ Map.of("share-group-id-2",
Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2,
List.of(0, 1)), Errors.NONE))
+ ));
+
+
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
+ ))
+ ))
+ .build()
+ )).thenReturn(CompletableFuture.completedFuture(
+ new DeleteShareGroupStateResult.Builder()
+ .setTopicsData(List.of(
+ new TopicData<>(
+ shareGroupTopicId2,
+ List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.NONE.code(), Errors.NONE.message()),
+ PartitionFactory.newPartitionErrorData(1,
Errors.NONE.code(), Errors.NONE.message())
+ ))
+ ))
+ .build()
+ ));
+
+ // share-group-id-1
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+ // share-group-id-2
+ when(runtime.scheduleWriteOperation(
+ ArgumentMatchers.eq("delete-groups"),
+ ArgumentMatchers.eq(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+ List<String> groupIds = List.of("share-group-id-1",
"share-group-id-2");
+
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>
future =
+ service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS),
groupIds, BufferSupplier.NO_CACHING);
+
+ try {
+ future.getNow(null);
+ } catch (Exception e) {
+ fail(e);
+ }
Review Comment:
ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]