This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new d02028d773d MINOR: Code change to prevent NPE to due share delete.
(#20092)
d02028d773d is described below
commit d02028d773d6de40cb7a693c1ae423b59975d802
Author: Sushant Mahajan <[email protected]>
AuthorDate: Wed Jul 2 21:46:18 2025 +0530
MINOR: Code change to prevent NPE to due share delete. (#20092)
* The `GroupCoordinatorService.onPartitionsDeleted` code and
`GroupMetadataManager.shareGroupBuildPartitionDeleteRequest` code looks
up the metadata image to find topic name/partitions for topic ids.
* If the topic id is not present in the image, it will throw an NPE
resulting in crash.
* This PR aims to solve the issue.
Reviewers: Andrew Schofield <[email protected]>
---
.../coordinator/group/GroupCoordinatorService.java | 17 ++++++++++----
.../coordinator/group/GroupMetadataManager.java | 26 +++++++++++++---------
2 files changed, 29 insertions(+), 14 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 099201ecabb..4e0e03265a6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -96,6 +96,7 @@ import
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.TopicImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
@@ -2137,10 +2138,18 @@ public class GroupCoordinatorService implements
GroupCoordinator {
).get();
// At this point the metadata will not have been updated
- // with the deleted topics.
- Set<Uuid> topicIds = topicPartitions.stream()
- .map(tp -> metadataImage.topics().getTopic(tp.topic()).id())
- .collect(Collectors.toSet());
+ // with the deleted topics, but we must guard against it.
+ if (metadataImage == null ||
metadataImage.equals(MetadataImage.EMPTY)) {
+ return;
+ }
+
+ Set<Uuid> topicIds = new HashSet<>();
+ for (TopicPartition tp : topicPartitions) {
+ TopicImage image = metadataImage.topics().getTopic(tp.topic());
+ if (image != null) {
+ topicIds.add(image.id());
+ }
+ }
CompletableFuture.allOf(
FutureUtils.mapExceptionally(
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9c9500485b1..ca47af5d4a3 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -7977,19 +7977,21 @@ public class GroupMetadataManager {
// a retry for the same is possible. Since this is part of an admin
operation
// retrying delete should not pose issues related to
// performance. Also, the share coordinator is idempotent on delete
partitions.
- Map<Uuid, InitMapValue> deletingTopics =
shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics().stream()
- .map(tid -> {
- TopicImage image = metadataImage.topics().getTopic(tid);
- return Map.entry(tid, new InitMapValue(image.name(),
image.partitions().keySet(), -1));
- })
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ Set<Uuid> deletingCurrent =
shareGroupStatePartitionMetadata.get(shareGroupId).deletingTopics();
+ if (metadataImage != null &&
!metadataImage.equals(MetadataImage.EMPTY)) {
+ Map<Uuid, InitMapValue> deletingTopics = deletingCurrent.stream()
+ .map(tid -> metadataImage.topics().getTopic(tid))
+ .filter(Objects::nonNull)
+ .map(image -> Map.entry(image.id(), new
InitMapValue(image.name(), image.partitions().keySet(), -1)))
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
- if (!deletingTopics.isEmpty()) {
- log.info("Existing deleting entries found in share group {} - {}",
shareGroupId, deletingTopics);
- deleteCandidates = combineInitMaps(deleteCandidates,
deletingTopics);
+ if (!deletingTopics.isEmpty()) {
+ log.info("Existing deleting entries found in share group {} -
{}", shareGroupId, deletingTopics);
+ deleteCandidates = combineInitMaps(deleteCandidates,
deletingTopics);
+ }
}
- if (deleteCandidates.isEmpty()) {
+ if (deleteCandidates.isEmpty() && deletingCurrent.isEmpty()) {
return Optional.empty();
}
@@ -8013,6 +8015,10 @@ public class GroupMetadataManager {
attachTopicName(deleteCandidates.keySet())
));
+ if (topicDataList.isEmpty()) {
+ return Optional.empty();
+ }
+
return Optional.of(new DeleteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionIdData>()
.setGroupId(shareGroupId)