This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new d02028d773d MINOR: Code change to prevent NPE to due share delete. 
(#20092)
d02028d773d is described below

commit d02028d773d6de40cb7a693c1ae423b59975d802
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Jul 2 21:46:18 2025 +0530

    MINOR: Code change to prevent NPE to due share delete. (#20092)
    
    * The `GroupCoordinatorService.onPartitionsDeleted` code and
    `GroupMetadataManager.shareGroupBuildPartitionDeleteRequest` code looks
    up the metadata image to find topic name/partitions for topic ids.
    * If the topic id is not present in the image, it will throw an NPE
    resulting in crash.
    * This PR aims to solve the issue.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../coordinator/group/GroupCoordinatorService.java | 17 ++++++++++----
 .../coordinator/group/GroupMetadataManager.java    | 26 +++++++++++++---------
 2 files changed, 29 insertions(+), 14 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 099201ecabb..4e0e03265a6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -96,6 +96,7 @@ import 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
 import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.server.record.BrokerCompressionType;
@@ -2137,10 +2138,18 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         ).get();
 
         // At this point the metadata will not have been updated
-        // with the deleted topics.
-        Set<Uuid> topicIds = topicPartitions.stream()
-            .map(tp -> metadataImage.topics().getTopic(tp.topic()).id())
-            .collect(Collectors.toSet());
+        // with the deleted topics, but we must guard against it.
+        if (metadataImage == null || 
metadataImage.equals(MetadataImage.EMPTY)) {
+            return;
+        }
+
+        Set<Uuid> topicIds = new HashSet<>();
+        for (TopicPartition tp : topicPartitions) {
+            TopicImage image = metadataImage.topics().getTopic(tp.topic());
+            if (image != null) {
+                topicIds.add(image.id());
+            }
+        }
 
         CompletableFuture.allOf(
             FutureUtils.mapExceptionally(
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9c9500485b1..ca47af5d4a3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -7977,19 +7977,21 @@ public class GroupMetadataManager {
         // a retry for the same is possible. Since this is part of an admin 
operation
         // retrying delete should not pose issues related to
         // performance. Also, the share coordinator is idempotent on delete 
partitions.
-        Map<Uuid, InitMapValue> deletingTopics = 
shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics().stream()
-            .map(tid -> {
-                TopicImage image = metadataImage.topics().getTopic(tid);
-                return Map.entry(tid, new InitMapValue(image.name(), 
image.partitions().keySet(), -1));
-            })
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        Set<Uuid> deletingCurrent = 
shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics();
+        if (metadataImage != null && 
!metadataImage.equals(MetadataImage.EMPTY)) {
+            Map<Uuid, InitMapValue> deletingTopics = deletingCurrent.stream()
+                .map(tid -> metadataImage.topics().getTopic(tid))
+                .filter(Objects::nonNull)
+                .map(image -> Map.entry(image.id(), new 
InitMapValue(image.name(), image.partitions().keySet(), -1)))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
 
-        if (!deletingTopics.isEmpty()) {
-            log.info("Existing deleting entries found in share group {} - {}", 
shareGroupId, deletingTopics);
-            deleteCandidates = combineInitMaps(deleteCandidates, 
deletingTopics);
+            if (!deletingTopics.isEmpty()) {
+                log.info("Existing deleting entries found in share group {} - 
{}", shareGroupId, deletingTopics);
+                deleteCandidates = combineInitMaps(deleteCandidates, 
deletingTopics);
+            }
         }
 
-        if (deleteCandidates.isEmpty()) {
+        if (deleteCandidates.isEmpty() && deletingCurrent.isEmpty()) {
             return Optional.empty();
         }
 
@@ -8013,6 +8015,10 @@ public class GroupMetadataManager {
             attachTopicName(deleteCandidates.keySet())
         ));
 
+        if (topicDataList.isEmpty()) {
+            return Optional.empty();
+        }
+
         return Optional.of(new DeleteShareGroupStateParameters.Builder()
             .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionIdData>()
                 .setGroupId(shareGroupId)

Reply via email to