dajac commented on code in PR #18848:
URL: https://github.com/apache/kafka/pull/18848#discussion_r1963519484


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6855,6 +6861,45 @@ public void createGroupTombstoneRecords(
         group.createGroupTombstoneRecords(records);
     }
 
+    /**
+     * Returns an optional of delete share group request object to be used 
with the persister.
+     * Empty if no subscribed topics or if the share group is empty.
+     * @param shareGroup - A share group
+     * @return Optional of object representing the share group state delete 
request.
+     */
+    public Optional<DeleteShareGroupStateParameters> 
shareGroupBuildPartitionDeleteRequest(ShareGroup shareGroup) {
+        TopicsImage topicsImage = metadataImage.topics();
+        Set<String> subscribedTopics = 
shareGroup.subscribedTopicNames().keySet();

Review Comment:
   For my understanding, this set is always empty given that we require the 
group to be empty, right? This will be addressed in a follow-up PR if I 
understood correctly.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
         assertEquals(expectedResultCollection, future.get());
     }
 
+    @Test
+    public void testDeleteWithShareGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // non-share group
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        // null
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(List.of(
+            result3.duplicate(),
+            result2.duplicate(),
+            result1.duplicate()
+        ));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(Map.of()));   // 
non-share group
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = Arrays.asList("share-group-id-1", 
"group-id-2", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }

Review Comment:
   nit: This is not necessary because the exception will be caught anyway by 
the test running.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
         assertEquals(expectedResultCollection, future.get());
     }
 
+    @Test
+    public void testDeleteWithShareGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // non-share group
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        // null
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(List.of(
+            result3.duplicate(),
+            result2.duplicate(),
+            result1.duplicate()
+        ));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(Map.of()));   // 
non-share group
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = Arrays.asList("share-group-id-1", 
"group-id-2", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(1)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupPersisterError() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group success
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            result1.duplicate(),
+            result2.duplicate()));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+        Uuid shareGroupTopicId2 = Uuid.randomUuid();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-2", 
Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2, 
List.of(0, 1)), Errors.NONE))
+        ));
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
+                        ))
+                ))
+                .build()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId2,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // share-group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = List.of("share-group-id-1", 
"share-group-id-2");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(2)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupCoordinatorShareSpecificWriteError() 
throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(
+            result1.duplicate()
+        );
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.failedFuture(
+            Errors.COORDINATOR_NOT_AVAILABLE.exception()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        List<String> groupIds = List.of("share-group-id-1");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(0)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupNotEmptyError() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.forException(new GroupNotEmptyException("bad 
stuff")).code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(
+            result1.duplicate()
+        );
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", Map.entry(EMPTY_PARAMS, 
Errors.forException(new GroupNotEmptyException("bad stuff"))))
+        ));
+
+        List<String> groupIds = List.of("share-group-id-1");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -846,6 +861,152 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    List<String> deleteCandidateGroupIds(

Review Comment:
   Should it be private? If we use it from tests, let's add `// package-private 
for testing` please.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
         assertEquals(expectedResultCollection, future.get());
     }
 
+    @Test
+    public void testDeleteWithShareGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // non-share group
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        // null
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(List.of(
+            result3.duplicate(),
+            result2.duplicate(),
+            result1.duplicate()
+        ));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(Map.of()));   // 
non-share group
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = Arrays.asList("share-group-id-1", 
"group-id-2", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(1)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupPersisterError() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group success
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            result1.duplicate(),
+            result2.duplicate()));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+        Uuid shareGroupTopicId2 = Uuid.randomUuid();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-2", 
Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2, 
List.of(0, 1)), Errors.NONE))
+        ));
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
+                        ))
+                ))
+                .build()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId2,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // share-group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = List.of("share-group-id-1", 
"share-group-id-2");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(2)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupCoordinatorShareSpecificWriteError() 
throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(
+            result1.duplicate()
+        );
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.failedFuture(
+            Errors.COORDINATOR_NOT_AVAILABLE.exception()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        List<String> groupIds = List.of("share-group-id-1");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(0)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupNotEmptyError() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.forException(new GroupNotEmptyException("bad 
stuff")).code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(
+            result1.duplicate()
+        );
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", Map.entry(EMPTY_PARAMS, 
Errors.forException(new GroupNotEmptyException("bad stuff"))))
+        ));
+
+        List<String> groupIds = List.of("share-group-id-1");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        // If there is error creating share group delete req
+        // neither persister call nor general delete groups call is made.
+        verify(persister, times(0)).deleteState(any());
+        verify(runtime, times(0)).scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        );
+    }
+
+    @Test
+    public void testDeleteShareGroupCoordinatorGeneralWriteError() throws 
Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.CLUSTER_AUTHORIZATION_FAILED.code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(
+            result1.duplicate()
+        );
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.failedFuture(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()));
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(new
 DeleteShareGroupStateResult.Builder()
+            .setTopicsData(List.of(
+                new TopicData<>(
+                    shareGroupTopicId,
+                    List.of(
+                        PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                        PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                    ))
+            ))
+            .build()
+        ));
+
+        List<String> groupIds = List.of("share-group-id-1");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -846,6 +861,152 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
+    List<String> deleteCandidateGroupIds(
+        Map<String, Errors> groupErrMap,
+        List<String> groupList,
+        DeleteGroupsResponseData.DeletableGroupResultCollection collection
+    ) {
+        List<String> errGroupIds = new ArrayList<>();
+        groupErrMap.forEach((groupId, error) -> {
+            if (error.code() != Errors.NONE.code()) {
+                log.error("Error deleting share group {} due to error {}", 
groupId, error);
+                errGroupIds.add(groupId);
+                collection.add(
+                    new DeleteGroupsResponseData.DeletableGroupResult()
+                        .setGroupId(groupId)
+                        .setErrorCode(error.code())
+                );
+            }
+        });
+
+        Set<String> groupSet = new HashSet<>(groupList);
+        // Remove all share group ids which have errored out
+        // when deleting with persister.
+        groupSet.removeAll(errGroupIds);
+
+        // Let us invoke the standard procedure of any non-share
+        // groups or successfully deleted share groups remaining.
+        return groupSet.stream().toList();
+    }
+
+    CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
handleDeleteGroups(

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -2532,4 +2927,17 @@ public GroupCoordinatorServiceBuilder 
setMetrics(GroupCoordinatorMetrics metrics
             return this;
         }
     }
+
+    private DeleteShareGroupStateParameters createDeleteShareRequest(String 
groupId, Uuid topic, List<Integer> partitions) {

Review Comment:
   nit: Should we make it static?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -823,20 +831,27 @@ public 
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection
         });
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
-            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
-                runtime.scheduleWriteOperation(
-                    "delete-groups",
-                    topicPartition,
-                    Duration.ofMillis(config.offsetCommitTimeoutMs()),
-                    coordinator -> coordinator.deleteGroups(context, groupList)
-                ).exceptionally(exception -> handleOperationException(
-                    "delete-groups",
-                    groupList,
-                    exception,
-                    (error, __) -> 
DeleteGroupsRequest.getErrorResultCollection(groupList, error),
-                    log
-                ));
+            
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap 
-> {
+                DeleteGroupsResponseData.DeletableGroupResultCollection 
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
+                List<String> retainedGroupIds = 
deleteCandidateGroupIds(groupErrMap, groupList, collection);
+                if (retainedGroupIds.isEmpty()) {
+                    return CompletableFuture.completedFuture(collection);
+                }
 
+                return handleDeleteGroups(context, topicPartition, 
retainedGroupIds)
+                    .whenComplete((resp, __) -> resp.forEach(result -> 
collection.add(result.duplicate())))
+                    .thenApply(__ -> collection);
+            });
+            // deleteShareGroups has its own exceptionally block, so we don't 
need one here.
+
+            // This future object has the following stages:
+            // - First it invokes the share group delete flow where the shard 
sharePartitionDeleteRequests
+            // method is invoked, and it returns request objects for each 
valid share group passed to it.
+            // - Then the requests are passed to the persister.deleteStata 
method one at a time. The results

Review Comment:
   nit: `deleteStata` -> `deleteState`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
         assertEquals(expectedResultCollection, future.get());
     }
 
+    @Test
+    public void testDeleteWithShareGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // non-share group
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        // null
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(List.of(
+            result3.duplicate(),
+            result2.duplicate(),
+            result1.duplicate()
+        ));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(Map.of()));   // 
non-share group
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = Arrays.asList("share-group-id-1", 
"group-id-2", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(1)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupPersisterError() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group success
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            result1.duplicate(),
+            result2.duplicate()));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+        Uuid shareGroupTopicId2 = Uuid.randomUuid();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-2", 
Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2, 
List.of(0, 1)), Errors.NONE))
+        ));
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
+                        ))
+                ))
+                .build()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId2,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // share-group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = List.of("share-group-id-1", 
"share-group-id-2");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(2)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupCoordinatorShareSpecificWriteError() 
throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.add(
+            result1.duplicate()
+        );
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.failedFuture(
+            Errors.COORDINATOR_NOT_AVAILABLE.exception()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        List<String> groupIds = List.of("share-group-id-1");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1627,6 +1643,378 @@ public void testDeleteGroups() throws Exception {
         assertEquals(expectedResultCollection, future.get());
     }
 
+    @Test
+    public void testDeleteWithShareGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1");
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // non-share group
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("group-id-2");
+        resultCollection2.add(result2);
+
+        // null
+        DeleteGroupsResponseData.DeletableGroupResult result3 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(List.of(
+            result3.duplicate(),
+            result2.duplicate(),
+            result1.duplicate()
+        ));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(Map.of()));   // 
non-share group
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = Arrays.asList("share-group-id-1", 
"group-id-2", null);
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }
+        assertEquals(expectedResultCollection, future.get());
+        verify(persister, times(1)).deleteState(any());
+    }
+
+    @Test
+    public void testDeleteShareGroupPersisterError() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        Persister persister = mock(Persister.class);
+        GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
+            .setConfig(createConfig())
+            .setRuntime(runtime)
+            .setMetrics(mock(GroupCoordinatorMetrics.class))
+            .setPersister(persister)
+            .build();
+        service.startup(() -> 3);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection1 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group err
+        DeleteGroupsResponseData.DeletableGroupResult result1 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-1")
+            .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code());
+        resultCollection1.add(result1);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
resultCollection2 =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        // share group success
+        DeleteGroupsResponseData.DeletableGroupResult result2 = new 
DeleteGroupsResponseData.DeletableGroupResult()
+            .setGroupId("share-group-id-2");
+        resultCollection2.add(result2);
+
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
expectedResultCollection =
+            new DeleteGroupsResponseData.DeletableGroupResultCollection();
+        expectedResultCollection.addAll(Arrays.asList(
+            result1.duplicate(),
+            result2.duplicate()));
+
+        Uuid shareGroupTopicId = Uuid.randomUuid();
+        Uuid shareGroupTopicId2 = Uuid.randomUuid();
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-share-groups"),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any(),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-1", 
Map.entry(createDeleteShareRequest("share-group-id-1", shareGroupTopicId, 
List.of(0, 1)), Errors.NONE))
+        )).thenReturn(CompletableFuture.completedFuture(
+            Map.of("share-group-id-2", 
Map.entry(createDeleteShareRequest("share-group-id-2", shareGroupTopicId2, 
List.of(0, 1)), Errors.NONE))
+        ));
+
+        
when(persister.deleteState(any())).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.UNKNOWN_SERVER_ERROR.code(), Errors.UNKNOWN_SERVER_ERROR.message())
+                        ))
+                ))
+                .build()
+        )).thenReturn(CompletableFuture.completedFuture(
+            new DeleteShareGroupStateResult.Builder()
+                .setTopicsData(List.of(
+                    new TopicData<>(
+                        shareGroupTopicId2,
+                        List.of(
+                            PartitionFactory.newPartitionErrorData(0, 
Errors.NONE.code(), Errors.NONE.message()),
+                            PartitionFactory.newPartitionErrorData(1, 
Errors.NONE.code(), Errors.NONE.message())
+                        ))
+                ))
+                .build()
+        ));
+
+        // share-group-id-1
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection1));
+
+        // share-group-id-2
+        when(runtime.scheduleWriteOperation(
+            ArgumentMatchers.eq("delete-groups"),
+            ArgumentMatchers.eq(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 2)),
+            ArgumentMatchers.eq(Duration.ofMillis(5000)),
+            ArgumentMatchers.any()
+        )).thenReturn(CompletableFuture.completedFuture(resultCollection2));
+
+        List<String> groupIds = List.of("share-group-id-1", 
"share-group-id-2");
+        
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future =
+            service.deleteGroups(requestContext(ApiKeys.DELETE_GROUPS), 
groupIds, BufferSupplier.NO_CACHING);
+
+        try {
+            future.getNow(null);
+        } catch (Exception e) {
+            fail(e);
+        }

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to