This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a6dfde7ce64 KAFKA-18629: Utilize share group partition metadata for 
delete group. (#19363)
a6dfde7ce64 is described below

commit a6dfde7ce64ee00d257bb6ff20ce923dacdc0edb
Author: Sushant Mahajan <[email protected]>
AuthorDate: Fri Apr 11 00:45:13 2025 +0530

    KAFKA-18629: Utilize share group partition metadata for delete group. 
(#19363)
    
    * Currently, the delete share group code flow uses
    `group.subscribedTopicNames()` to fetch information about all the share
    partitions to which a share group is subscribed to.
    * However, this is incorrect since once the group is EMPTY, a
    precondition for delete, the aforementioned method will return an empty
    list.
    * In this PR we have leveraged the `ShareGroupStatePartitionMetadata`
    record to grab the `initialized` and `initializing` partitions to build
    the delete candidates, which remedies the situation.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../group/GroupCoordinatorRecordHelpers.java       |  15 +++
 .../coordinator/group/GroupCoordinatorService.java |  34 ++++---
 .../coordinator/group/GroupCoordinatorShard.java   |   9 +-
 .../coordinator/group/GroupMetadataManager.java    |  75 ++++++++------
 .../coordinator/group/modern/share/ShareGroup.java |   1 +
 .../apache/kafka/coordinator/group/Assertions.java |  31 +++++-
 .../group/GroupCoordinatorShardTest.java           |  43 ++++----
 .../group/GroupMetadataManagerTest.java            | 108 ++++++++++++++-------
 .../coordinator/share/ShareCoordinatorShard.java   |  10 ++
 .../share/ShareCoordinatorShardTest.java           |  23 ++---
 10 files changed, 232 insertions(+), 117 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
index d0f6901d444..42790515a2d 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java
@@ -814,6 +814,21 @@ public class GroupCoordinatorRecordHelpers {
         );
     }
 
+    /**
+     * Creates a ShareGroupStatePartitionMetadata tombstone.
+     *
+     * @param groupId   The share group id.
+     * @return The record.
+     */
+    public static CoordinatorRecord 
newShareGroupStatePartitionMetadataTombstoneRecord(
+        String groupId
+    ) {
+        return CoordinatorRecord.tombstone(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId)
+        );
+    }
+
     /**
      * Creates a ShareGroupStatePartitionMetadata record.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 97bc7e69622..c00502af48f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1073,24 +1073,25 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
 
         groupsByTopicPartition.forEach((topicPartition, groupList) -> {
             
CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> 
future = deleteShareGroups(topicPartition, groupList).thenCompose(groupErrMap 
-> {
-                DeleteGroupsResponseData.DeletableGroupResultCollection 
collection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
-                List<String> retainedGroupIds = 
deleteCandidateGroupIds(groupErrMap, groupList, collection);
+                DeleteGroupsResponseData.DeletableGroupResultCollection 
deletableGroupResults = new 
DeleteGroupsResponseData.DeletableGroupResultCollection();
+                List<String> retainedGroupIds = 
updateResponseAndGetNonErrorGroupList(groupErrMap, groupList, 
deletableGroupResults);
                 if (retainedGroupIds.isEmpty()) {
-                    return CompletableFuture.completedFuture(collection);
+                    return 
CompletableFuture.completedFuture(deletableGroupResults);
                 }
 
                 return handleDeleteGroups(context, topicPartition, 
retainedGroupIds)
-                    .whenComplete((resp, __) -> resp.forEach(result -> 
collection.add(result.duplicate())))
-                    .thenApply(__ -> collection);
+                    .whenComplete((resp, __) -> resp.forEach(result -> 
deletableGroupResults.add(result.duplicate())))
+                    .thenApply(__ -> deletableGroupResults);
             });
             // deleteShareGroups has its own exceptionally block, so we don't 
need one here.
 
             // This future object has the following stages:
             // - First it invokes the share group delete flow where the shard 
sharePartitionDeleteRequests
             // method is invoked, and it returns request objects for each 
valid share group passed to it.
+            // All initialized and initializing share partitions are moved to 
deleting.
             // - Then the requests are passed to the persister.deleteState 
method one at a time. The results
             // are collated as a Map of groupId -> persister errors
-            // - The above map is then used to decide whether to invoke the 
group coordinator delete groups logic
+            // - The above map can be used to decide whether to invoke the 
group coordinator delete groups logic
             // - Share groups with failed persister delete are NOT CONSIDERED 
for group coordinator delete.
             // TLDR: DeleteShareGroups -> filter erroneous persister deletes 
-> general delete groups logic
             futures.add(future);
@@ -1102,17 +1103,26 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             (accumulator, newResults) -> newResults.forEach(result -> 
accumulator.add(result.duplicate())));
     }
 
-    private List<String> deleteCandidateGroupIds(
-        Map<String, Errors> groupErrMap,
+    /**
+     * Processes input shareGroupErrMap by retaining only those which do not 
contain an error.
+     * Also updates the result collection input arg with share groups 
containing errors.
+     *
+     * @param shareGroupErrMap      Map keyed on share groupId and value as 
the error (NONE for no error).
+     * @param groupList             Entire list of groups (all types)
+     * @param deletableGroupResults Collection of responses for delete groups 
request.
+     * @return A list of all non-error groupIds
+     */
+    private List<String> updateResponseAndGetNonErrorGroupList(
+        Map<String, Errors> shareGroupErrMap,
         List<String> groupList,
-        DeleteGroupsResponseData.DeletableGroupResultCollection collection
+        DeleteGroupsResponseData.DeletableGroupResultCollection 
deletableGroupResults
     ) {
         List<String> errGroupIds = new ArrayList<>();
-        groupErrMap.forEach((groupId, error) -> {
+        shareGroupErrMap.forEach((groupId, error) -> {
             if (error.code() != Errors.NONE.code()) {
                 log.error("Error deleting share group {} due to error {}", 
groupId, error);
                 errGroupIds.add(groupId);
-                collection.add(
+                deletableGroupResults.add(
                     new DeleteGroupsResponseData.DeletableGroupResult()
                         .setGroupId(groupId)
                         .setErrorCode(error.code())
@@ -1153,7 +1163,7 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         TopicPartition topicPartition,
         List<String> groupList
     ) {
-        // topicPartition refers to internal topic __consumer_offsets
+        // topicPartition refers to internal topic __consumer_offsets.
         return runtime.scheduleWriteOperation(
             "delete-share-groups",
             topicPartition,
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index ded14600dab..812641c9459 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -589,18 +589,19 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
      * Method returns a Map keyed on groupId and value as pair of {@link 
DeleteShareGroupStateParameters}
      * and any ERRORS while building the request corresponding
      * to the valid share groups passed as the input.
-     * <p></p>
+     * <p>
      * The groupIds are first filtered by type to restrict the list to share 
groups.
      * @param groupIds - A list of groupIds as string
-     * @return {@link CoordinatorResult} object always containing empty 
records and Map keyed on groupId and value pair (req, error)
+     * @return A result object containing a map keyed on groupId and value 
pair (req, error) and related coordinator records.
      */
     public CoordinatorResult<Map<String, 
Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> 
sharePartitionDeleteRequests(List<String> groupIds) {
         Map<String, Map.Entry<DeleteShareGroupStateParameters, Errors>> 
responseMap = new HashMap<>();
+        List<CoordinatorRecord> records = new ArrayList<>();
         for (String groupId : groupIds) {
             try {
                 ShareGroup group = groupMetadataManager.shareGroup(groupId);
                 group.validateDeleteGroup();
-                
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(group)
+                
groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, records)
                     .ifPresent(req -> responseMap.put(groupId, Map.entry(req, 
Errors.NONE)));
             } catch (GroupIdNotFoundException exception) {
                 log.debug("GroupId {} not found as a share group.", groupId);
@@ -609,7 +610,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                 responseMap.put(groupId, 
Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, 
Errors.forException(exception)));
             }
         }
-        return new CoordinatorResult<>(List.of(), responseMap);
+        return new CoordinatorResult<>(records, responseMap);
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 08e9159b02d..f8a114d999f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2984,7 +2984,7 @@ public class GroupMetadataManager {
                 groupId,
                 attachTopicName(finalInitializingMap),
                 attachTopicName(currentMap.initializedTopics()),
-                Map.of()
+                attachTopicName(currentMap.deletingTopics())
             )
         );
     }
@@ -4979,7 +4979,7 @@ public class GroupMetadataManager {
                 group.groupId(),
                 attachTopicName(finalInitializingMap),
                 attachTopicName(finalInitializedMap),
-                Map.of()
+                attachTopicName(currentMap.deletingTopics())
             )),
             null
         );
@@ -5025,7 +5025,7 @@ public class GroupMetadataManager {
                     groupId,
                     attachTopicName(finalInitializingTopics),
                     attachTopicName(info.initializedTopics()),
-                    Map.of()
+                    attachTopicName(info.deletingTopics())
                 )
             ),
             null
@@ -5057,6 +5057,13 @@ public class GroupMetadataManager {
         return requests;
     }
 
+    private Map<Uuid, String> attachTopicName(Set<Uuid> topicIds) {
+        TopicsImage topicsImage = metadataImage.topics();
+        return topicIds.stream()
+            .map(topicId -> Map.entry(topicId, 
topicsImage.getTopic(topicId).name()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
     private Map<Uuid, Map.Entry<String, Set<Integer>>> 
attachTopicName(Map<Uuid, Set<Integer>> initMap) {
         TopicsImage topicsImage = metadataImage.topics();
         Map<Uuid, Map.Entry<String, Set<Integer>>> finalMap = new HashMap<>();
@@ -8142,39 +8149,49 @@ public class GroupMetadataManager {
     /**
      * Returns an optional of delete share group request object to be used 
with the persister.
      * Empty if no subscribed topics or if the share group is empty.
-     * @param shareGroup - A share group
+     * @param shareGroupId      Share group id
+     * @param records           List of coordinator records to append to
      * @return Optional of object representing the share group state delete 
request.
      */
-    public Optional<DeleteShareGroupStateParameters> 
shareGroupBuildPartitionDeleteRequest(ShareGroup shareGroup) {
-        TopicsImage topicsImage = metadataImage.topics();
-        Set<String> subscribedTopics = 
shareGroup.subscribedTopicNames().keySet();
-        List<TopicData<PartitionIdData>> topicDataList = new 
ArrayList<>(subscribedTopics.size());
-
-        for (String topic : subscribedTopics) {
-            TopicImage topicImage = topicsImage.getTopic(topic);
-            topicDataList.add(
-                new TopicData<>(
-                    topicImage.id(),
-                    topicImage.partitions().keySet().stream()
-                        .map(PartitionFactory::newPartitionIdData)
-                        .toList()
-                )
-            );
+    public Optional<DeleteShareGroupStateParameters> 
shareGroupBuildPartitionDeleteRequest(String shareGroupId, 
List<CoordinatorRecord> records) {
+        if (!shareGroupPartitionMetadata.containsKey(shareGroupId)) {
+            return Optional.empty();
         }
 
-        if (topicDataList.isEmpty()) {
+        Map<Uuid, Set<Integer>> deleteCandidates = mergeShareGroupInitMaps(
+            shareGroupPartitionMetadata.get(shareGroupId).initializedTopics(),
+            shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
+        );
+
+        if (deleteCandidates.isEmpty()) {
             return Optional.empty();
         }
 
-        return Optional.of(
-            new DeleteShareGroupStateParameters.Builder()
-                .setGroupTopicPartitionData(
-                    new GroupTopicPartitionData.Builder<PartitionIdData>()
-                        .setGroupId(shareGroup.groupId())
-                        .setTopicsData(topicDataList)
-                        .build()
-                )
-                .build()
+        List<TopicData<PartitionIdData>> topicDataList = new 
ArrayList<>(deleteCandidates.size());
+
+        for (Map.Entry<Uuid, Set<Integer>> entry : 
deleteCandidates.entrySet()) {
+            topicDataList.add(new TopicData<>(
+                entry.getKey(),
+                entry.getValue().stream()
+                    .map(PartitionFactory::newPartitionIdData)
+                    .toList()
+            ));
+        }
+
+        // Remove all initializing and initialized topic info from record and 
add deleting.
+        
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
+            shareGroupId,
+            Map.of(),
+            Map.of(),
+            attachTopicName(deleteCandidates.keySet())
+        ));
+
+        return Optional.of(new DeleteShareGroupStateParameters.Builder()
+            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
+                .setGroupId(shareGroupId)
+                .setTopicsData(topicDataList)
+                .build())
+            .build()
         );
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index 1e7d11f866a..b63100744c6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -236,6 +236,7 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
         );
 
         
records.add(GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId()));
+        
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId()));
         
records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId()));
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
index 227d07a8def..a593bb3d664 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/Assertions.java
@@ -31,6 +31,7 @@ import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetada
 import 
org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
 import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
 import 
org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
+import 
org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 
 import org.opentest4j.AssertionFailedError;
@@ -60,7 +61,8 @@ public class Assertions {
         ConsumerGroupPartitionMetadataValue.class, 
Assertions::assertConsumerGroupPartitionMetadataValue,
         GroupMetadataValue.class, Assertions::assertGroupMetadataValue,
         ConsumerGroupTargetAssignmentMemberValue.class, 
Assertions::assertConsumerGroupTargetAssignmentMemberValue,
-        ShareGroupPartitionMetadataValue.class, 
Assertions::assertShareGroupPartitionMetadataValue
+        ShareGroupPartitionMetadataValue.class, 
Assertions::assertShareGroupPartitionMetadataValue,
+        ShareGroupStatePartitionMetadataValue.class, 
Assertions::assertShareGroupStatePartitionMetadataValue
     );
 
     public static void assertResponseEquals(
@@ -285,6 +287,33 @@ public class Assertions {
         assertEquals(expected, actual);
     }
 
+    private static void assertShareGroupStatePartitionMetadataValue(
+        ApiMessage exp,
+        ApiMessage act
+    ) {
+        ShareGroupStatePartitionMetadataValue expected = 
(ShareGroupStatePartitionMetadataValue) exp.duplicate();
+        ShareGroupStatePartitionMetadataValue actual = 
(ShareGroupStatePartitionMetadataValue) act.duplicate();
+
+        Consumer<ShareGroupStatePartitionMetadataValue> normalize = message -> 
{
+            
message.initializedTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
+            message.initializedTopics().forEach(topic -> {
+                topic.partitions().sort(Comparator.naturalOrder());
+            });
+
+            
message.initializingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo::topicId));
+            message.initializingTopics().forEach(topic -> {
+                topic.partitions().sort(Comparator.naturalOrder());
+            });
+
+            
message.deletingTopics().sort(Comparator.comparing(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId));
+        };
+
+        normalize.accept(expected);
+        normalize.accept(actual);
+
+        assertEquals(expected, actual);
+    }
+
     private static void assertGroupMetadataValue(
         ApiMessage exp,
         ApiMessage act
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index f8e0d8a70f6..16665f98bf2 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -1860,9 +1860,10 @@ public class GroupCoordinatorShardTest {
             metricsShard
         );
 
-        ShareGroup shareGroup = new ShareGroup(new 
SnapshotRegistry(mock(LogContext.class)), "share-group");
+        String groupId = "share-group";
+        ShareGroup shareGroup = new ShareGroup(new 
SnapshotRegistry(mock(LogContext.class)), groupId);
 
-        
when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup);
+        
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
         
when(groupMetadataManager.shareGroup(eq("non-share-group"))).thenThrow(GroupIdNotFoundException.class);
 
         TopicData<PartitionIdData> topicData = new 
TopicData<>(Uuid.randomUuid(),
@@ -1873,21 +1874,20 @@ public class GroupCoordinatorShardTest {
 
         DeleteShareGroupStateParameters params = new 
DeleteShareGroupStateParameters.Builder()
             .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
-                .setGroupId("share-group")
+                .setGroupId(groupId)
                 .setTopicsData(List.of(topicData))
                 .build())
             .build();
 
-        
when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(shareGroup))).thenReturn(Optional.of(params));
+        
when(groupMetadataManager.shareGroupBuildPartitionDeleteRequest(eq(groupId), 
anyList())).thenReturn(Optional.of(params));
 
         CoordinatorResult<Map<String, 
Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> 
expectedResult =
-            new CoordinatorResult<>(List.of(), Map.of("share-group", 
Map.entry(params, Errors.NONE)));
+            new CoordinatorResult<>(List.of(), Map.of(groupId, 
Map.entry(params, Errors.NONE)));
 
-
-        assertEquals(expectedResult, 
coordinator.sharePartitionDeleteRequests(List.of("share-group", 
"non-share-group")));
-        verify(groupMetadataManager, times(1)).shareGroup(eq("share-group"));
+        assertEquals(expectedResult, 
coordinator.sharePartitionDeleteRequests(List.of(groupId, "non-share-group")));
+        verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
         verify(groupMetadataManager, 
times(1)).shareGroup(eq("non-share-group"));
-        verify(groupMetadataManager, 
times(1)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
+        verify(groupMetadataManager, 
times(1)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
 
         // empty list
         Mockito.reset(groupMetadataManager);
@@ -1897,9 +1897,9 @@ public class GroupCoordinatorShardTest {
             coordinator.sharePartitionDeleteRequests(List.of())
         );
 
-        verify(groupMetadataManager, times(0)).group(eq("share-group"));
+        verify(groupMetadataManager, times(0)).group(eq(groupId));
         verify(groupMetadataManager, times(0)).group(eq("non-share-group"));
-        verify(groupMetadataManager, 
times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
+        verify(groupMetadataManager, 
times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
     }
 
     @Test
@@ -1919,19 +1919,24 @@ public class GroupCoordinatorShardTest {
             metricsShard
         );
 
+        String groupId = "share-group";
         ShareGroup shareGroup = mock(ShareGroup.class);
         doThrow(new GroupNotEmptyException("bad 
stuff")).when(shareGroup).validateDeleteGroup();
 
-        
when(groupMetadataManager.shareGroup(eq("share-group"))).thenReturn(shareGroup);
+        
when(groupMetadataManager.shareGroup(eq(groupId))).thenReturn(shareGroup);
 
         CoordinatorResult<Map<String, 
Map.Entry<DeleteShareGroupStateParameters, Errors>>, CoordinatorRecord> 
expectedResult =
-            new CoordinatorResult<>(List.of(), Map.of("share-group", 
Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS,
-                Errors.forException(new GroupNotEmptyException("bad stuff")))
-            ));
-        assertEquals(expectedResult, 
coordinator.sharePartitionDeleteRequests(List.of("share-group")));
-        verify(groupMetadataManager, times(1)).shareGroup(eq("share-group"));
+            new CoordinatorResult<>(
+                List.of(),
+                Map.of(
+                    groupId,
+                    Map.entry(DeleteShareGroupStateParameters.EMPTY_PARAMS, 
Errors.forException(new GroupNotEmptyException("bad stuff")))
+                )
+            );
+        assertEquals(expectedResult, 
coordinator.sharePartitionDeleteRequests(List.of(groupId)));
+        verify(groupMetadataManager, times(1)).shareGroup(eq(groupId));
         // Not called because of NON-EMPTY group.
-        verify(groupMetadataManager, 
times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
+        verify(groupMetadataManager, 
times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
 
         // empty list
         Mockito.reset(groupMetadataManager);
@@ -1942,6 +1947,6 @@ public class GroupCoordinatorShardTest {
         );
 
         verify(groupMetadataManager, times(0)).group(eq("share-group"));
-        verify(groupMetadataManager, 
times(0)).shareGroupBuildPartitionDeleteRequest(eq(shareGroup));
+        verify(groupMetadataManager, 
times(0)).shareGroupBuildPartitionDeleteRequest(eq(groupId), anyList());
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 95e247beef7..35574d35ca3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -99,7 +99,6 @@ import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
 import org.apache.kafka.coordinator.group.modern.Assignment;
 import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
 import org.apache.kafka.coordinator.group.modern.MemberState;
-import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
 import org.apache.kafka.coordinator.group.modern.TopicMetadata;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
 import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
@@ -128,9 +127,7 @@ import org.apache.kafka.server.authorizer.Action;
 import org.apache.kafka.server.authorizer.AuthorizationResult;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
-import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
 import 
org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
-import org.apache.kafka.server.share.persister.PartitionFactory;
 import org.apache.kafka.server.share.persister.PartitionIdData;
 import org.apache.kafka.server.share.persister.PartitionStateData;
 import org.apache.kafka.server.share.persister.TopicData;
@@ -147,7 +144,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -209,7 +205,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -15421,7 +15416,7 @@ public class GroupMetadataManagerTest {
     }
 
     @Test
-    public void testShareGroupDelete() {
+    public void testShareGroupDeleteTombstones() {
         String groupId = "share-group-id";
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withShareGroup(new ShareGroupBuilder(groupId, 10))
@@ -15430,6 +15425,7 @@ public class GroupMetadataManagerTest {
         List<CoordinatorRecord> expectedRecords = List.of(
             
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentEpochTombstoneRecord(groupId),
             
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataTombstoneRecord(groupId),
+            
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataTombstoneRecord(groupId),
             
GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord(groupId)
         );
         List<CoordinatorRecord> records = new ArrayList<>();
@@ -20745,7 +20741,7 @@ public class GroupMetadataManagerTest {
     }
 
     @Test
-    public void testSharePartitionDeleteRequest() {
+    public void testShareGroupDeleteRequest() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
         assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
@@ -20754,38 +20750,58 @@ public class GroupMetadataManagerTest {
 
         Uuid t1Uuid = Uuid.randomUuid();
         Uuid t2Uuid = Uuid.randomUuid();
-        MetadataImage image = spy(new MetadataImageBuilder()
-            .addTopic(t1Uuid, "t1", 2)
-            .addTopic(t2Uuid, "t2", 2)
-            .build());
-
-        context.groupMetadataManager.onNewMetadataImage(image, 
mock(MetadataDelta.class));
+        String t1Name = "t1";
+        String t2Name = "t2";
 
+        String groupId = "share-group";
         ShareGroup shareGroup = mock(ShareGroup.class);
-        Map<String, SubscriptionCount> topicMap = new LinkedHashMap<>();
-        topicMap.put("t1", mock(SubscriptionCount.class));
-        topicMap.put("t2", mock(SubscriptionCount.class));
-        when(shareGroup.subscribedTopicNames()).thenReturn(topicMap);
-        when(shareGroup.groupId()).thenReturn("share-group");
+        when(shareGroup.groupId()).thenReturn(groupId);
         when(shareGroup.isEmpty()).thenReturn(false);
 
-        DeleteShareGroupStateParameters expectedParameters = new 
DeleteShareGroupStateParameters.Builder()
-            .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
-                .setGroupId("share-group")
-                .setTopicsData(List.of(
-                    new TopicData<>(t1Uuid, 
List.of(PartitionFactory.newPartitionIdData(0), 
PartitionFactory.newPartitionIdData(1))),
-                    new TopicData<>(t2Uuid, 
List.of(PartitionFactory.newPartitionIdData(0), 
PartitionFactory.newPartitionIdData(1)))
-                ))
-                .build()
-            )
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(t1Uuid, t1Name, 2)
+            .addTopic(t2Uuid, t2Name, 2)
             .build();
-        Optional<DeleteShareGroupStateParameters> params = 
context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(shareGroup);
-        assertTrue(params.isPresent());
-        assertEquals(expectedParameters.groupTopicPartitionData(), 
params.get().groupTopicPartitionData());
 
-        verify(image, times(1)).topics();
-        verify(shareGroup, times(1)).subscribedTopicNames();
-        verify(shareGroup, times(1)).groupId();
+        MetadataDelta delta = new MetadataDelta(image);
+        context.groupMetadataManager.onNewMetadataImage(image, delta);
+
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 
0));
+
+        context.replay(
+            
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
+                groupId,
+                Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
+                Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
+                Map.of()
+            )
+        );
+
+        context.commit();
+
+        Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
+            t1Uuid, Set.of(0, 1),
+            t2Uuid, Set.of(0, 1)
+        );
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            newShareGroupStatePartitionMetadataRecord(
+                groupId,
+                Map.of(),
+                Map.of(),
+                Map.of(t1Uuid, t1Name, t2Uuid, t2Name)
+            )
+        );
+
+        List<CoordinatorRecord> records = new ArrayList<>();
+        Optional<DeleteShareGroupStateParameters> params = 
context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, 
records);
+        verifyShareGroupDeleteRequest(
+            params,
+            expectedTopicPartitionMap,
+            groupId,
+            true
+        );
+        assertRecordsEquals(expectedRecords, records);
     }
 
     @Test
@@ -20844,14 +20860,15 @@ public class GroupMetadataManagerTest {
             new ShareGroupStatePartitionMetadataKey()
                 .setGroupId(groupId),
             new ShareGroupStatePartitionMetadataValue()
+                .setInitializingTopics(List.of())
                 .setInitializedTopics(List.of(
                     new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
                         .setTopicId(t1Uuid)
-                        .setTopicName("t1")
+                        .setTopicName(t1Name)
                         .setPartitions(List.of(0, 1)),
                     new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
                         .setTopicId(t2Uuid)
-                        .setTopicName("t2")
+                        .setTopicName(t2Name)
                         .setPartitions(List.of(0, 1))
                 ))
                 .setDeletingTopics(List.of())
@@ -21292,4 +21309,25 @@ public class GroupMetadataManagerTest {
             assertTrue(initRequest.isEmpty());
         }
     }
+
+    private void verifyShareGroupDeleteRequest(
+        Optional<DeleteShareGroupStateParameters> deleteRequest,
+        Map<Uuid, Set<Integer>> expectedTopicPartitionsMap,
+        String groupId,
+        boolean shouldExist
+    ) {
+        if (shouldExist) {
+            assertTrue(deleteRequest.isPresent());
+            DeleteShareGroupStateParameters request = deleteRequest.get();
+            assertEquals(groupId, request.groupTopicPartitionData().groupId());
+            Map<Uuid, Set<Integer>> actualTopicPartitionsMap = new HashMap<>();
+            for (TopicData<PartitionIdData> topicData : 
request.groupTopicPartitionData().topicsData()) {
+                actualTopicPartitionsMap.computeIfAbsent(topicData.topicId(), 
k -> new HashSet<>())
+                    
.addAll(topicData.partitions().stream().map(PartitionIdData::partition).toList());
+            }
+            assertEquals(expectedTopicPartitionsMap, actualTopicPartitionsMap);
+        } else {
+            assertTrue(deleteRequest.isEmpty());
+        }
+    }
 }
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index e0118ee2499..35891bddf8d 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -515,6 +515,16 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         DeleteShareGroupStateRequestData.PartitionData partitionData = 
topicData.partitions().get(0);
         SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicData.topicId(), 
partitionData.partition());
 
+        if (!shareStateMap.containsKey(key)) {
+            log.warn("Attempted to delete non-existent share partition {}.", 
key);
+            return new CoordinatorResult<>(List.of(), new 
DeleteShareGroupStateResponseData().setResults(
+                
List.of(DeleteShareGroupStateResponse.toResponseDeleteStateResult(key.topicId(),
+                    
List.of(DeleteShareGroupStateResponse.toResponsePartitionResult(
+                        key.partition()))
+                ))
+            ));
+        }
+
         CoordinatorRecord record = generateTombstoneRecord(key);
         // build successful response if record is correctly created
         DeleteShareGroupStateResponseData responseData = new 
DeleteShareGroupStateResponseData().setResults(
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 0426c4e5005..20be20832d8 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -994,9 +994,7 @@ class ShareCoordinatorShardTest {
                 .setPartitions(List.of(new 
DeleteShareGroupStateRequestData.PartitionData()
                     .setPartition(PARTITION)))));
 
-        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
-
-        // apply a record in to verify delete
+        // Apply a record to the state machine so that delete can be verified.
         CoordinatorRecord record = 
ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
             GROUP_ID,
             TOPIC_ID,
@@ -1021,7 +1019,9 @@ class ShareCoordinatorShardTest {
         assertNotNull(shard.getLeaderMapValue(shareCoordinatorKey));
         assertNotNull(shard.getStateEpochMapValue(shareCoordinatorKey));
 
-        // apply tombstone
+        CoordinatorResult<DeleteShareGroupStateResponseData, 
CoordinatorRecord> result = shard.deleteState(request);
+
+        // Apply tombstone.
         shard.replay(0L, 0L, (short) 0, result.records().get(0));
 
         DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
@@ -1039,7 +1039,7 @@ class ShareCoordinatorShardTest {
     }
 
     @Test
-    public void testDeleteStateFirstRecordDeleteSuccess() {
+    public void testDeleteStateUnintializedRecord() {
         ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
 
         SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
@@ -1057,21 +1057,10 @@ class ShareCoordinatorShardTest {
         assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
         assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
 
-        // apply tombstone
-        shard.replay(0L, 0L, (short) 0, result.records().get(0));
-
         DeleteShareGroupStateResponseData expectedData = 
DeleteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
-        List<CoordinatorRecord> expectedRecords = List.of(
-            ShareCoordinatorRecordHelpers.newShareStateTombstoneRecord(
-                GROUP_ID, TOPIC_ID, PARTITION)
-        );
 
         assertEquals(expectedData, result.response());
-        assertEquals(expectedRecords, result.records());
-
-        assertNull(shard.getShareStateMapValue(shareCoordinatorKey));
-        assertNull(shard.getLeaderMapValue(shareCoordinatorKey));
-        assertNull(shard.getStateEpochMapValue(shareCoordinatorKey));
+        assertEquals(List.of(), result.records());
     }
 
     @Test


Reply via email to