This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new b80aa15c17e KAFKA-19383: Handle the deleted topics when applying
ClearElrRecord (#20033)
b80aa15c17e is described below
commit b80aa15c17e0191d9c3bb087fae48046c7d74043
Author: Calvin Liu <[email protected]>
AuthorDate: Tue Jun 24 17:04:45 2025 -0700
KAFKA-19383: Handle the deleted topics when applying ClearElrRecord (#20033)
https://issues.apache.org/jira/browse/KAFKA-19383 When applying the
ClearElrRecord, it may pick up the topicId in the image without checking
if the topic has been deleted. This can cause the creation of a new
TopicRecord with an old topic ID.
Reviewers: Alyssa Huang <[email protected]>, Artem Livshits
<[email protected]>, Colin P. McCabe <[email protected]>
No conflicts.
---
.../java/org/apache/kafka/image/TopicsDelta.java | 33 ++-
.../org/apache/kafka/image/TopicsImageTest.java | 223 +++++++++++++++++++++
2 files changed, 248 insertions(+), 8 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index b6deed5281a..e25bf71f05a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -95,26 +95,43 @@ public final class TopicsDelta {
topicDelta.replay(record);
}
+ private void maybeReplayClearElrRecord(Uuid topicId, ClearElrRecord
record) {
+ // Only apply the record if the topic is not deleted.
+ if (!deletedTopicIds.contains(topicId)) {
+ TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
+ topicDelta.replay(record);
+ }
+ }
+
+ // When replaying the ClearElrRecord, we need to first find the latest
topic ID associated with the topic(s) because
+ // multiple topic IDs for the same topic in a TopicsDelta is possible in
the event of topic deletion and recreation.
+ // Second, we should not add the topicDelta if the given topic ID has been
deleted. So that we don't leak the
+ // deleted topic ID.
public void replay(ClearElrRecord record) {
if (!record.topicName().isEmpty()) {
- Uuid topicId;
- if (image.getTopic(record.topicName()) != null) {
- topicId = image.getTopic(record.topicName()).id();
- } else {
+ Uuid topicId = null;
+ // CreatedTopics contains the latest topic IDs. It should be
checked first in case the topic is deleted and
+ // created in the same batch.
+ if (createdTopics.containsKey(record.topicName())) {
topicId = createdTopics.get(record.topicName());
+ } else if (image.getTopic(record.topicName()) != null) {
+ topicId = image.getTopic(record.topicName()).id();
}
+
if (topicId == null) {
throw new RuntimeException("Unable to clear elr for topic with
name " +
record.topicName() + ": no such topic found.");
}
- TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
- topicDelta.replay(record);
+
+ maybeReplayClearElrRecord(topicId, record);
} else {
// Update all the existing topics
image.topicsById().forEach((topicId, image) -> {
- TopicDelta topicDelta = getOrCreateTopicDelta(topicId);
- topicDelta.replay(record);
+ maybeReplayClearElrRecord(topicId, record);
});
+ createdTopicIds().forEach((topicId -> {
+ maybeReplayClearElrRecord(topicId, record);
+ }));
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 1fb44875c45..e5666156232 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -428,6 +428,229 @@ public class TopicsImageTest {
assertEquals(0,
image.getTopic(barId).partitions().get(0).lastKnownElr.length);
}
+ @Test
+ public void testClearElrRecordOnNonExistingTopic() {
+ TopicsImage image = TopicsImage.EMPTY;
+
+ List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+ topicRecords.addAll(List.of(
+ new ApiMessageAndVersion(
+ new ClearElrRecord().setTopicName("foo"),
+ CLEAR_ELR_RECORD.highestSupportedVersion()
+ ))
+ );
+ TopicsDelta delta = new TopicsDelta(image);
+ assertThrows(RuntimeException.class, () ->
RecordTestUtils.replayAll(delta, topicRecords));
+ }
+
+ @Test
+ public void testClearElrRecords_All_ForDeletedTopics() {
+ Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+ Uuid fooId2 = Uuid.randomUuid();
+ Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+ Uuid barId2 = Uuid.randomUuid();
+
+ List<TopicImage> topics = new ArrayList<>();
+ topics.add(
+ newTopicImage(
+ "foo",
+ fooId,
+ newPartition(new int[] {0, 1, 2, 3})
+ )
+ );
+ TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
+ newTopicsByNameMap(topics));
+
+ List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+ topicRecords.add(
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(fooId).
+ setPartitionId(0).
+ setLeader(0).
+ setIsr(List.of(1, 2, 3)),
+ PARTITION_RECORD.highestSupportedVersion()
+ )
+ );
+
+ TopicsDelta delta = new TopicsDelta(image);
+ RecordTestUtils.replayAll(delta, topicRecords);
+ image = delta.apply();
+
+ topicRecords = new ArrayList<>();
+ /* Test the following:
+ 1. Topic foo is deleted and created in the same delta, the clear
elr applies on the new topic
+ 2. Topic bar is created, deleted, then created in the same delta,
the clear elr applies on the new topic
+ */
+ topicRecords.addAll(List.of(
+ new ApiMessageAndVersion(
+ new RemoveTopicRecord().setTopicId(fooId),
+ REMOVE_TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new TopicRecord().setTopicId(fooId2).
+ setName("foo"),
+ TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
+ setIsr(List.of(0, 1)).
+ setEligibleLeaderReplicas(List.of(2)).
+ setLastKnownElr(List.of(3)),
+ PARTITION_CHANGE_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new TopicRecord().setTopicId(barId).
+ setName("bar"),
+ TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(barId).
+ setPartitionId(0).
+ setLeader(0).
+ setIsr(List.of(1, 2, 3)),
+ PARTITION_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new RemoveTopicRecord().setTopicId(barId),
+ REMOVE_TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new TopicRecord().setTopicId(barId2).
+ setName("bar"),
+ TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(barId2).
+ setPartitionId(0).
+ setLeader(0).
+ setIsr(List.of(1)).
+ setEligibleLeaderReplicas(List.of(2)).
+ setLastKnownElr(List.of(3)),
+ PARTITION_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new ClearElrRecord(),
+ CLEAR_ELR_RECORD.highestSupportedVersion()
+ ))
+ );
+ delta = new TopicsDelta(image);
+ RecordTestUtils.replayAll(delta, topicRecords);
+ image = delta.apply();
+ assertEquals(2, image.topicsById().size());
+ assertEquals(2, image.topicsByName().size());
+
+ assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
+ assertEquals(0,
image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
+ assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
+ assertEquals(0,
image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
+ }
+
+ @Test
+ public void testClearElrRecords_Single_ForDeletedTopics() {
+ Uuid fooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+ Uuid fooId2 = Uuid.randomUuid();
+ Uuid barId = Uuid.fromString("f62ptyETTjet8SL5ZeREiw");
+ Uuid barId2 = Uuid.randomUuid();
+
+ List<TopicImage> topics = new ArrayList<>();
+ topics.add(
+ newTopicImage(
+ "foo",
+ fooId,
+ newPartition(new int[] {0, 1, 2, 3})
+ )
+ );
+ TopicsImage image = new TopicsImage(newTopicsByIdMap(topics),
+ newTopicsByNameMap(topics));
+
+ List<ApiMessageAndVersion> topicRecords = new ArrayList<>();
+ topicRecords.add(
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(fooId).
+ setPartitionId(0).
+ setLeader(0).
+ setIsr(List.of(1, 2, 3)),
+ PARTITION_RECORD.highestSupportedVersion()
+ )
+ );
+
+ TopicsDelta delta = new TopicsDelta(image);
+ RecordTestUtils.replayAll(delta, topicRecords);
+ image = delta.apply();
+
+ topicRecords = new ArrayList<>();
+ /* Test the following:
+ 1. Topic foo is deleted and created in the same delta, the clear
elr applies on the new topic
+ 2. Topic bar is created, deleted, then created in the same delta,
the clear elr applies on the new topic
+ */
+ topicRecords.addAll(List.of(
+ new ApiMessageAndVersion(
+ new RemoveTopicRecord().setTopicId(fooId),
+ REMOVE_TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new TopicRecord().setTopicId(fooId2).
+ setName("foo"),
+ TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(fooId2).setPartitionId(0).
+ setIsr(List.of(0, 1)).
+ setEligibleLeaderReplicas(List.of(2)).
+ setLastKnownElr(List.of(3)),
+ PARTITION_CHANGE_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new TopicRecord().setTopicId(barId).
+ setName("bar"),
+ TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(barId).
+ setPartitionId(0).
+ setLeader(0).
+ setIsr(List.of(1, 2, 3)),
+ PARTITION_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new RemoveTopicRecord().setTopicId(barId),
+ REMOVE_TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new TopicRecord().setTopicId(barId2).
+ setName("bar"),
+ TOPIC_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new PartitionRecord().setTopicId(barId2).
+ setPartitionId(0).
+ setLeader(0).
+ setIsr(List.of(1)).
+ setEligibleLeaderReplicas(List.of(2)).
+ setLastKnownElr(List.of(3)),
+ PARTITION_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new ClearElrRecord().setTopicName("foo"),
+ CLEAR_ELR_RECORD.highestSupportedVersion()
+ ),
+ new ApiMessageAndVersion(
+ new ClearElrRecord().setTopicName("bar"),
+ CLEAR_ELR_RECORD.highestSupportedVersion()
+ ))
+ );
+ delta = new TopicsDelta(image);
+ RecordTestUtils.replayAll(delta, topicRecords);
+ image = delta.apply();
+ assertEquals(2, image.topicsById().size());
+ assertEquals(2, image.topicsByName().size());
+
+ assertEquals(0, image.getTopic(fooId2).partitions().get(0).elr.length);
+ assertEquals(0,
image.getTopic(fooId2).partitions().get(0).lastKnownElr.length);
+ assertEquals(0, image.getTopic(barId2).partitions().get(0).elr.length);
+ assertEquals(0,
image.getTopic(barId2).partitions().get(0).lastKnownElr.length);
+ }
+
@Test
public void testClearElrRecordForNonExistTopic() {
TopicsImage image = new TopicsImage(newTopicsByIdMap(List.of()),