This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new b80aa15c17e KAFKA-19383: Handle the deleted topics when applying 
ClearElrRecord (#20033)
b80aa15c17e is described below

commit b80aa15c17e0191d9c3bb087fae48046c7d74043
Author: Calvin Liu <[email protected]>
AuthorDate: Tue Jun 24 17:04:45 2025 -0700

    KAFKA-19383: Handle the deleted topics when applying ClearElrRecord (#20033)
    
    https://issues.apache.org/jira/browse/KAFKA-19383 When applying the
    ClearElrRecord, it may pick up the topicId in the image without checking
    if the topic has been deleted. This can cause the creation of a new
    TopicRecord with an old topic ID.
    
    Reviewers: Alyssa Huang <[email protected]>, Artem Livshits 
<[email protected]>, Colin P. McCabe <[email protected]>
    
    No conflicts.
---
 .../java/org/apache/kafka/image/TopicsDelta.java   |  33 ++-
 .../org/apache/kafka/image/TopicsImageTest.java    | 223 +++++++++++++++++++++
 2 files changed, 248 insertions(+), 8 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index b6deed5281a..e25bf71f05a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -95,26 +95,43 @@ public final class TopicsDelta {
         topicDelta.replay(record);
     }
 
+    private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord 
record) {
+        // Only apply the record if the topic is not deleted.
+        if (!deletedTopicIds.contains(topicId)) {
+            TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
+            topicDelta.replay(record);
+        }
+    }
+
+    // When replaying the ClearElrRecord, we need to first find the latest 
topic ID associated with the topic(s) because
+    // multiple topic IDs for the same topic in a TopicsDelta is possible in 
the event of topic deletion and recreation.
+    // Second, we should not add the topicDelta if the given topic ID has been 
deleted. So that we don't leak the
+    // deleted topic ID.
     public void replay(ClearElrRecord record) {
         if (!record.topicName().isEmpty()) {
-            Uuid topicId;
-            if (image.getTopic(record.topicName()) != null) {
-                topicId = image.getTopic(record.topicName()).id();
-            } else {
+            Uuid topicId = null;
+            // CreatedTopics contains the latest topic IDs. It should be 
checked first in case the topic is deleted and
+            // created in the same batch.
+            if (createdTopics.containsKey(record.topicName())) {
                 topicId = createdTopics.get(record.topicName());
+            } else if (image.getTopic(record.topicName()) != null) {
+                topicId = image.getTopic(record.topicName()).id();
             }
+
             if (topicId == null) {
                 throw new RuntimeException("Unable to clear elr for topic with 
name " +
                     record.topicName() + ": no such topic found.");
             }
-            TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
-            topicDelta.replay(record);
+
+            maybeReplayClearElrRecord(topicId, record);
         } else {
             // Update all the existing topics
             image.topicsById().forEach((topicId, image) -> {
-                TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
-                topicDelta.replay(record);
+                maybeReplayClearElrRecord(topicId, record);
             });
+            createdTopicIds().forEach((topicId -> {
+                maybeReplayClearElrRecord(topicId, record);
+            }));
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 1fb44875c45..e5666156232 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -428,6 +428,229 @@ public class TopicsImageTest {
         assertEquals(0, 
image.getTopic(barId).partitions().get(0).lastKnownElr.length);
     }
 
+    @Test
+    public void testClearElrRecordOnNonExistingTopic() {
+        TopicsImage image = TopicsImage.EMPTY;
+
+        List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+        topicRecords.addAll(List.of(
+            new ApiMessageAndVersion(
+                new ClearElrRecord().setTopicName("foo"),
+                CLEAR_ELR_RECORD.highestSupportedVersion()
+            ))
+        );
+        TopicsDelta delta = new TopicsDelta(image);
+        assertThrows(RuntimeException.class, () -> 
RecordTestUtils.replayAll(delta, topicRecords));
+    }
+
+    @Test
+    public void testClearElrRecords_All_ForDeletedTopics() {
+        Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+        Uuid fooId2 = Uuid.randomUuid();
+        Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+        Uuid barId2 = Uuid.randomUuid();
+
+        List<TopicImage> topics = new ArrayList<>();
+        topics.add(
+            newTopicImage(
+                "foo",
+                fooId,
+                newPartition(new int[] {0, 1, 2, 3})
+            )
+        );
+        TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
+            newTopicsByNameMap(topics));
+
+        List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(fooId).
+                    setPartitionId(0).
+                    setLeader(0).
+                    setIsr(List.of(1, 2, 3)),
+                PARTITION_RECORD.highestSupportedVersion()
+            )
+        );
+
+        TopicsDelta delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+        image = delta.apply();
+
+        topicRecords = new ArrayList<>();
+        /* Test the following:
+            1. Topic foo is deleted and created in the same delta, the clear 
elr applies on the new topic
+            2. Topic bar is created, deleted, then created in the same delta, 
the clear elr applies on the new topic
+        */
+        topicRecords.addAll(List.of(
+            new ApiMessageAndVersion(
+                new RemoveTopicRecord().setTopicId(fooId),
+                REMOVE_TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new TopicRecord().setTopicId(fooId2).
+                    setName("foo"),
+                TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
+                    setIsr(List.of(0, 1)).
+                    setEligibleLeaderReplicas(List.of(2)).
+                    setLastKnownElr(List.of(3)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new TopicRecord().setTopicId(barId).
+                    setName("bar"),
+                TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(barId).
+                    setPartitionId(0).
+                    setLeader(0).
+                    setIsr(List.of(1, 2, 3)),
+                PARTITION_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new RemoveTopicRecord().setTopicId(barId),
+                REMOVE_TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new TopicRecord().setTopicId(barId2).
+                    setName("bar"),
+                TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(barId2).
+                    setPartitionId(0).
+                    setLeader(0).
+                    setIsr(List.of(1)).
+                    setEligibleLeaderReplicas(List.of(2)).
+                    setLastKnownElr(List.of(3)),
+                PARTITION_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new ClearElrRecord(),
+                CLEAR_ELR_RECORD.highestSupportedVersion()
+            ))
+        );
+        delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+        image = delta.apply();
+        assertEquals(2, image.topicsById().size());
+        assertEquals(2, image.topicsByName().size());
+
+        assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
+        assertEquals(0, 
image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
+        assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
+        assertEquals(0, 
image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
+    }
+
+    @Test
+    public void testClearElrRecords_Single_ForDeletedTopics() {
+        Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+        Uuid fooId2 = Uuid.randomUuid();
+        Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+        Uuid barId2 = Uuid.randomUuid();
+
+        List<TopicImage> topics = new ArrayList<>();
+        topics.add(
+            newTopicImage(
+                "foo",
+                fooId,
+                newPartition(new int[] {0, 1, 2, 3})
+            )
+        );
+        TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
+            newTopicsByNameMap(topics));
+
+        List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+        topicRecords.add(
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(fooId).
+                    setPartitionId(0).
+                    setLeader(0).
+                    setIsr(List.of(1, 2, 3)),
+                PARTITION_RECORD.highestSupportedVersion()
+            )
+        );
+
+        TopicsDelta delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+        image = delta.apply();
+
+        topicRecords = new ArrayList<>();
+        /* Test the following:
+            1. Topic foo is deleted and created in the same delta, the clear 
elr applies on the new topic
+            2. Topic bar is created, deleted, then created in the same delta, 
the clear elr applies on the new topic
+        */
+        topicRecords.addAll(List.of(
+            new ApiMessageAndVersion(
+                new RemoveTopicRecord().setTopicId(fooId),
+                REMOVE_TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new TopicRecord().setTopicId(fooId2).
+                    setName("foo"),
+                TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
+                    setIsr(List.of(0, 1)).
+                    setEligibleLeaderReplicas(List.of(2)).
+                    setLastKnownElr(List.of(3)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new TopicRecord().setTopicId(barId).
+                    setName("bar"),
+                TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(barId).
+                    setPartitionId(0).
+                    setLeader(0).
+                    setIsr(List.of(1, 2, 3)),
+                PARTITION_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new RemoveTopicRecord().setTopicId(barId),
+                REMOVE_TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new TopicRecord().setTopicId(barId2).
+                    setName("bar"),
+                TOPIC_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new PartitionRecord().setTopicId(barId2).
+                    setPartitionId(0).
+                    setLeader(0).
+                    setIsr(List.of(1)).
+                    setEligibleLeaderReplicas(List.of(2)).
+                    setLastKnownElr(List.of(3)),
+                PARTITION_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new ClearElrRecord().setTopicName("foo"),
+                CLEAR_ELR_RECORD.highestSupportedVersion()
+            ),
+            new ApiMessageAndVersion(
+                new ClearElrRecord().setTopicName("bar"),
+                CLEAR_ELR_RECORD.highestSupportedVersion()
+            ))
+        );
+        delta = new TopicsDelta(image);
+        RecordTestUtils.replayAll(delta, topicRecords);
+        image = delta.apply();
+        assertEquals(2, image.topicsById().size());
+        assertEquals(2, image.topicsByName().size());
+
+        assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
+        assertEquals(0, 
image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
+        assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
+        assertEquals(0, 
image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
+    }
+
     @Test
     public void testClearElrRecordForNonExistTopic() {
         TopicsImage image = new TopicsImage(newTopicsByIdMap(List.of()),

Reply via email to