This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cdbb8fa4cec KAFKA-18629: Account for existing deleting topics in share 
group delete. (#19463)
cdbb8fa4cec is described below

commit cdbb8fa4cec2734e653adf7f8ca36f7b9d943c11
Author: Sushant Mahajan <[email protected]>
AuthorDate: Tue Apr 15 20:24:09 2025 +0530

    KAFKA-18629: Account for existing deleting topics in share group delete. 
(#19463)
    
    * When deleting share groups, only initialized and initializing
    information in the metadata record is considered. However, it could
    happen that the deleting topics also contains info due to other RPCs
    (share group offsets delete).
    * We need to account for existing information while writing the metadata
    record in the delete flow.
    * This PR aims to add the impl for the same. New tests have been added
    to check the functionality.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  37 +++++-
 .../group/GroupMetadataManagerTest.java            | 134 ++++++++++++++++++++-
 2 files changed, 167 insertions(+), 4 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 2e2b02f9432..5ec1ab1d75e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2968,7 +2968,8 @@ public class GroupMetadataManager {
             )).build();
     }
 
-    private void addInitializingTopicsRecords(String groupId, 
List<CoordinatorRecord> records, Map<Uuid, Set<Integer>> topicPartitionMap) {
+    // Visibility for tests
+    void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> 
records, Map<Uuid, Set<Integer>> topicPartitionMap) {
         if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
             return;
         }
@@ -2982,12 +2983,23 @@ public class GroupMetadataManager {
         // We must combine the existing information in the record with the 
topicPartitionMap argument.
         Map<Uuid, Set<Integer>> finalInitializingMap = 
mergeShareGroupInitMaps(currentMap.initializingTopics(), topicPartitionMap);
 
+        // If any initializing topics are also present in the deleting state
+        // we should remove them from deleting.
+        Set<Uuid> currentDeleting = new HashSet<>(currentMap.deletingTopics());
+        if (!currentDeleting.isEmpty()) {
+            finalInitializingMap.keySet().forEach(key -> {
+                if (currentDeleting.remove(key)) {
+                    log.warn("Initializing topic {} for share group {} found 
in deleting state as well, removing from deleting.", 
metadataImage.topics().getTopic(key).name(), groupId);
+                }
+            });
+        }
+
         records.add(
             newShareGroupStatePartitionMetadataRecord(
                 groupId,
                 attachTopicName(finalInitializingMap),
                 attachTopicName(currentMap.initializedTopics()),
-                attachTopicName(currentMap.deletingTopics())
+                attachTopicName(currentDeleting)
             )
         );
     }
@@ -8166,6 +8178,24 @@ public class GroupMetadataManager {
             shareGroupPartitionMetadata.get(shareGroupId).initializingTopics()
         );
 
+        // Ideally the deleting should be empty - if it is not then it implies
+        // that some previous share group delete or delete offsets command
+        // did not complete successfully. So, set up the delete request such 
that
+        // a retry for the same is possible. Since this is part of an admin 
operation
+        // retrying delete should not pose issues related to
+        // performance. Also, the share coordinator is idempotent on delete 
partitions.
+        Map<Uuid, Set<Integer>> deletingTopics = 
shareGroupPartitionMetadata.get(shareGroupId).deletingTopics().stream()
+            .map(tid -> Map.entry(tid, 
metadataImage.topics().getTopic(tid).partitions().keySet()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+        if (!deletingTopics.isEmpty()) {
+            log.info("Existing deleting entries found in share group {} - {}", 
shareGroupId, deletingTopics);
+            deleteCandidates = mergeShareGroupInitMaps(
+                deleteCandidates,
+                deletingTopics
+            );
+        }
+
         if (deleteCandidates.isEmpty()) {
             return Optional.empty();
         }
@@ -8181,7 +8211,8 @@ public class GroupMetadataManager {
             ));
         }
 
-        // Remove all initializing and initialized topic info from record and 
add deleting.
+        // Remove all initializing and initialized topic info from record and 
add deleting. There
+        // could be previous deleting topics due to offsets delete, we need to 
account for them as well.
         
records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
             shareGroupId,
             Map.of(),
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 4665e81e9be..459a3f4cbc4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -129,6 +129,7 @@ import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.server.authorizer.Action;
 import org.apache.kafka.server.authorizer.AuthorizationResult;
 import org.apache.kafka.server.authorizer.Authorizer;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
 import 
org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
 import org.apache.kafka.server.share.persister.PartitionIdData;
@@ -20744,7 +20745,7 @@ public class GroupMetadataManagerTest {
     }
 
     @Test
-    public void testShareGroupDeleteRequest() {
+    public void testShareGroupDeleteRequestNoDeletingTopics() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
         assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
@@ -20807,6 +20808,74 @@ public class GroupMetadataManagerTest {
         assertRecordsEquals(expectedRecords, records);
     }
 
+    @Test
+    public void testShareGroupDeleteRequestWithAlreadyDeletingTopics() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .build();
+
+        Uuid t1Uuid = Uuid.randomUuid();
+        Uuid t2Uuid = Uuid.randomUuid();
+        Uuid t3Uuid = Uuid.randomUuid();
+        String t1Name = "t1";
+        String t2Name = "t2";
+        String t3Name = "t3";
+
+        String groupId = "share-group";
+        ShareGroup shareGroup = mock(ShareGroup.class);
+        when(shareGroup.groupId()).thenReturn(groupId);
+        when(shareGroup.isEmpty()).thenReturn(false);
+
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(t1Uuid, t1Name, 2)
+            .addTopic(t2Uuid, t2Name, 2)
+            .addTopic(t3Uuid, t3Name, 2)
+            .build();
+
+        MetadataDelta delta = new MetadataDelta(image);
+        context.groupMetadataManager.onNewMetadataImage(image, delta);
+
+        
context.replay(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 
0));
+
+        context.replay(
+            
GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(
+                groupId,
+                Map.of(t1Uuid, Map.entry(t1Name, Set.of(0, 1))),
+                Map.of(t2Uuid, Map.entry(t2Name, Set.of(0, 1))),
+                Map.of(t3Uuid, t3Name)
+            )
+        );
+
+        context.commit();
+
+        Map<Uuid, Set<Integer>> expectedTopicPartitionMap = Map.of(
+            t1Uuid, Set.of(0, 1),
+            t2Uuid, Set.of(0, 1),
+            t3Uuid, Set.of(0, 1)
+        );
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            newShareGroupStatePartitionMetadataRecord(
+                groupId,
+                Map.of(),
+                Map.of(),
+                Map.of(t1Uuid, t1Name, t2Uuid, t2Name, t3Uuid, t3Name)  // 
Existing deleting topics should be included here.
+            )
+        );
+
+        List<CoordinatorRecord> records = new ArrayList<>();
+        Optional<DeleteShareGroupStateParameters> params = 
context.groupMetadataManager.shareGroupBuildPartitionDeleteRequest(groupId, 
records);
+        verifyShareGroupDeleteRequest(
+            params,
+            expectedTopicPartitionMap,
+            groupId,
+            true
+        );
+        assertRecordsEquals(expectedRecords, records);
+    }
+
     @Test
     public void testSharePartitionsEligibleForOffsetDeletionSuccess() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
@@ -21351,6 +21420,69 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testShareGroupInitializingClearsCommonDeleting() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .build();
+
+        Uuid t1Uuid = Uuid.randomUuid();
+        String t1Name = "t1";
+        MetadataImage image = new MetadataImageBuilder()
+            .addTopic(t1Uuid, t1Name, 2)
+            .build();
+
+        String groupId = "share-group";
+
+        context.groupMetadataManager.onNewMetadataImage(image, 
mock(MetadataDelta.class));
+        context.groupMetadataManager.replay(
+            new ShareGroupMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupMetadataValue()
+                .setEpoch(0)
+        );
+
+        // Replay a deleting record.
+        context.groupMetadataManager.replay(
+            new ShareGroupStatePartitionMetadataKey()
+                .setGroupId(groupId),
+            new ShareGroupStatePartitionMetadataValue()
+                .setInitializingTopics(List.of())
+                .setInitializedTopics(List.of())
+                .setDeletingTopics(List.of(
+                    new ShareGroupStatePartitionMetadataValue.TopicInfo()
+                        .setTopicId(t1Uuid)
+                        .setTopicName(t1Name)
+                ))
+        );
+
+        List<CoordinatorRecord> records = new ArrayList<>();
+        context.groupMetadataManager.addInitializingTopicsRecords(groupId, 
records, Map.of(t1Uuid, Set.of(0, 1)));
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            CoordinatorRecord.record(
+                new ShareGroupStatePartitionMetadataKey()
+                    .setGroupId(groupId),
+                new ApiMessageAndVersion(
+                    new ShareGroupStatePartitionMetadataValue()
+                        .setInitializingTopics(List.of(
+                            new 
ShareGroupStatePartitionMetadataValue.TopicPartitionsInfo()
+                                .setTopicId(t1Uuid)
+                                .setTopicName(t1Name)
+                                .setPartitions(List.of(0, 1))
+                        ))
+                        .setInitializedTopics(List.of())
+                        .setDeletingTopics(List.of()),
+                    (short) 0
+                )
+            )
+        );
+
+        assertEquals(expectedRecords, records);
+    }
+
     @Test
     public void testShareGroupInitializeSuccess() {
         String groupId = "groupId";

Reply via email to