This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new b2a9dfd KAFKA-13488: Producer fails to recover if topic gets deleted
midway (#11552)
b2a9dfd is described below
commit b2a9dfd5a0b81ab1b3eb54b89d06c79a307d9c20
Author: Prateek Agarwal <[email protected]>
AuthorDate: Thu Dec 16 20:14:39 2021 +0530
KAFKA-13488: Producer fails to recover if topic gets deleted midway (#11552)
Allow the leader epoch to be re-assigned to the new value from the Metadata
response if `oldTopicId` is not present in the cache. This is needed because
`oldTopicId` is removed from the cache if the topic gets deleted but the leader
epoch is not removed. Hence, metadata for the newly recreated topic won't be
accepted unless we allow `oldTopicId` to be null.
Reviewers: Jason Gustafson <[email protected]>, David Jacot
<[email protected]>
---
.../java/org/apache/kafka/clients/Metadata.java | 11 +--
.../org/apache/kafka/clients/MetadataTest.java | 34 +++++++--
.../kafka/api/ProducerSendWhileDeletionTest.scala | 83 ++++++++++++++++++++++
3 files changed, 119 insertions(+), 9 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index e4f1756..30ae642 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -393,10 +393,13 @@ public class Metadata implements Closeable {
if (hasReliableLeaderEpoch &&
partitionMetadata.leaderEpoch.isPresent()) {
int newEpoch = partitionMetadata.leaderEpoch.get();
Integer currentEpoch = lastSeenLeaderEpochs.get(tp);
- if (topicId != null && oldTopicId != null &&
!topicId.equals(oldTopicId)) {
- // If both topic IDs were valid and the topic ID changed,
update the metadata
- log.debug("Topic ID for partition {} changed from {} to {}, so
this topic must have been recreated. " +
- "Resetting the last seen epoch to {}.", tp,
oldTopicId, topicId, newEpoch);
+ if (topicId != null && !topicId.equals(oldTopicId)) {
+ // If the new topic ID is valid and different from the last
seen topic ID, update the metadata.
+ // Between the time that a topic is deleted and re-created,
the client may lose track of the
+ // corresponding topicId (i.e. `oldTopicId` will be null). In
this case, when we discover the new
+ // topicId, we allow the corresponding leader epoch to
override the last seen value.
+ log.info("Resetting the last seen epoch of partition {} to {}
since the associated topicId changed from {} to {}",
+ tp, newEpoch, oldTopicId, topicId);
lastSeenLeaderEpochs.put(tp, newEpoch);
return Optional.of(partitionMetadata);
} else if (currentEpoch == null || newEpoch >= currentEpoch) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5b560ad..a51e275 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -380,6 +380,31 @@ public class MetadataTest {
}
@Test
+ public void testEpochUpdateAfterTopicDeletion() {
+ TopicPartition tp = new TopicPartition("topic-1", 0);
+
+ MetadataResponse metadataResponse = emptyMetadataResponse();
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
+
+ // Start with a Topic topic-1 with a random topic ID
+ Map<String, Uuid> topicIds = Collections.singletonMap("topic-1",
Uuid.randomUuid());
+ metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10,
topicIds);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+ assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+ // Topic topic-1 is now deleted so Response contains an Error.
LeaderEpoch should still maintain Old value
+ metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1,
Collections.singletonMap("topic-1", Errors.UNKNOWN_TOPIC_OR_PARTITION),
Collections.emptyMap());
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+ assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+
+ // Create topic-1 again but this time with a different topic ID.
LeaderEpoch should be updated to new even if lower.
+ Map<String, Uuid> newTopicIds = Collections.singletonMap("topic-1",
Uuid.randomUuid());
+ metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 5,
newTopicIds);
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
+ assertEquals(Optional.of(5), metadata.lastSeenLeaderEpoch(tp));
+ }
+
+ @Test
public void testEpochUpdateOnChangedTopicIds() {
TopicPartition tp = new TopicPartition("topic-1", 0);
Map<String, Uuid> topicIds = Collections.singletonMap("topic-1",
Uuid.randomUuid());
@@ -388,13 +413,12 @@ public class MetadataTest {
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 0L);
// Start with a topic with no topic ID
- metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10);
+ metadataResponse = RequestTestUtils.metadataUpdateWith("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 100);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 1L);
- assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
+ assertEquals(Optional.of(100), metadata.lastSeenLeaderEpoch(tp));
- // We should treat an added topic ID as though it is the same topic.
Handle only when epoch increases.
- // Don't update to an older one
- metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 1,
topicIds);
+ // If the older topic ID is null, we should go with the new topic ID
as the leader epoch
+ metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1,
Collections.emptyMap(), Collections.singletonMap("topic-1", 1), _tp -> 10,
topicIds);
metadata.updateWithCurrentRequestVersion(metadataResponse, false, 2L);
assertEquals(Optional.of(10), metadata.lastSeenLeaderEpoch(tp));
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
new file mode 100644
index 0000000..ec05bb2
--- /dev/null
+++
b/core/src/test/scala/integration/kafka/api/ProducerSendWhileDeletionTest.scala
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.charset.StandardCharsets
+
+
+class ProducerSendWhileDeletionTest extends IntegrationTestHarness {
+ val producerCount: Int = 1
+ val brokerCount: Int = 2
+
+ serverConfig.put(KafkaConfig.NumPartitionsProp, 2.toString)
+ serverConfig.put(KafkaConfig.DefaultReplicationFactorProp, 2.toString)
+ serverConfig.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+
+ producerConfig.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000L.toString)
+ producerConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
10000.toString)
+ producerConfig.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
10000.toString)
+
+ /**
+ * Tests that Producer gets self-recovered when a topic is deleted mid-way
of produce.
+ *
+ * Producer will attempt to send messages to the partition specified in
each record, and should
+ * succeed as long as the partition is included in the metadata.
+ */
+ @Test
+ def testSendWithTopicDeletionMidWay(): Unit = {
+ val numRecords = 10
+ val topic = "topic"
+
+ // Create topic with leader as 0 for the 2 partitions.
+ createTopic(topic, Map(0 -> Seq(0, 1), 1 -> Seq(0, 1)))
+
+ val reassignment = Map(
+ new TopicPartition(topic, 0) -> Seq(1, 0),
+ new TopicPartition(topic, 1) -> Seq(1, 0)
+ )
+
+ // Change leader to 1 for both the partitions to increase leader epoch
from 0 -> 1
+ zkClient.createPartitionReassignment(reassignment)
+ TestUtils.waitUntilTrue(() => !zkClient.reassignPartitionsInProgress,
+ "failed to remove reassign partitions path after completion")
+
+ val producer = createProducer()
+
+ (1 to numRecords).foreach { i =>
+ val resp = producer.send(new ProducerRecord(topic, null, ("value"
+ i).getBytes(StandardCharsets.UTF_8))).get
+ assertEquals(topic, resp.topic())
+ }
+
+ // Start topic deletion
+ adminZkClient.deleteTopic(topic)
+
+ // Verify that the topic is deleted when no metadata request comes in
+ TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+
+ // Producer should be able to send messages even after topic gets
deleted and auto-created
+ assertEquals(topic, producer.send(new ProducerRecord(topic, null,
"value".getBytes(StandardCharsets.UTF_8))).get.topic())
+ }
+
+}