This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new f2e629d KAFKA-13394: Topic IDs should be removed from
PartitionFetchState if they are no longer sent by the controller (#11459)
f2e629d is described below
commit f2e629dcd3e2dfc420937bb82d58d3f25ea69714
Author: Justine Olshan <[email protected]>
AuthorDate: Thu Nov 18 08:30:40 2021 -0800
KAFKA-13394: Topic IDs should be removed from PartitionFetchState if they
are no longer sent by the controller (#11459)
With KAFKA-13102, we added topic IDs to the InitialFetchState and the
PartitionFetchState in order to send fetch requests using topic IDs when IBP is
3.1.
However, there are some cases where we could initially send topic IDs from
the controller and then no longer to do so (controller changes to an IBP <
2.8). If we do not remove from the PartitionFetchState and one broker is still
IBP 3.1, it will try to send a version 13 fetch request to brokers that no
longer have topic IDs in the metadata cache. This could leave the cluster in a
state unable to fetch from these partitions.
This patch removes the topic IDs from the PartitionFetchState if the log
contains a topic ID but the request does not. This means that we will always
handle a leader and isr request if there is no ID in the request but an ID in
the log.
Such a state should be transient because we are either
* upgrading the cluster and somehow switched between a new IBP controller
and an old one --> and will eventually have all new IBP controllers/brokers.
* downgrading the cluster --> will eventually have all old IBP
controllers/brokers and will restart the broker/delete the partition metadata
file for them.
Reviewers: David Jacot <[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 8 +++
.../kafka/server/FetchRequestTestDowngrade.scala | 81 ++++++++++++++++++++++
.../unit/kafka/server/ReplicaManagerTest.scala | 22 +++---
3 files changed, 103 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 991a8c5..9f8d923 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1393,6 +1393,14 @@ class ReplicaManager(val config: KafkaConfig,
if (partitionState.leader != localBrokerId)
topicIdUpdateFollowerPartitions.add(partition)
Errors.NONE
+ case None if logTopicId.isDefined && partitionState.leader
!= localBrokerId =>
+ // If we have a topic ID in the log but not in the
request, we must have previously had topic IDs but
+ // are now downgrading. If we are a follower, remove the
topic ID from the PartitionFetchState.
+ stateChangeLogger.info(s"Updating PartitionFetchState for
$topicPartition to remove log topic ID " +
+ s"${logTopicId.get} since LeaderAndIsr request from
controller $controllerId with correlation " +
+ s"id $correlationId epoch $controllerEpoch did not
contain a topic ID")
+ topicIdUpdateFollowerPartitions.add(partition)
+ Errors.NONE
case _ =>
stateChangeLogger.info(s"Ignoring LeaderAndIsr request
from " +
s"controller $controllerId with correlation id
$correlationId " +
diff --git
a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
new file mode 100644
index 0000000..148e076
--- /dev/null
+++
b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.server
+
+import java.time.Duration
+import java.util.Arrays.asList
+
+import kafka.api.{ApiVersion, KAFKA_2_7_IV0, KAFKA_3_1_IV0}
+import kafka.server.{BaseRequestTest, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.ZkVersion
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.collection.{Map, Seq}
+
+class FetchRequestTestDowngrade extends BaseRequestTest {
+
+ override def brokerCount: Int = 2
+ override def generateConfigs: Seq[KafkaConfig] = {
+ // Controller should start with newer IBP and downgrade to the older
one.
+ Seq(
+ createConfig(0, KAFKA_3_1_IV0),
+ createConfig(1, KAFKA_2_7_IV0)
+ )
+ }
+
+ @Test
+ def testTopicIdIsRemovedFromFetcherWhenControllerDowngrades(): Unit = {
+ val tp = new TopicPartition("topic", 0)
+ val producer = createProducer()
+ val consumer = createConsumer()
+
+ ensureControllerIn(Seq(0))
+ assertEquals(0, controllerSocketServer.config.brokerId)
+ val partitionLeaders = createTopic(tp.topic, Map(tp.partition ->
Seq(1, 0)))
+ TestUtils.waitForAllPartitionsMetadata(servers, tp.topic, 1)
+ ensureControllerIn(Seq(1))
+ assertEquals(1, controllerSocketServer.config.brokerId)
+
+ assertEquals(1, partitionLeaders(0))
+
+ val record1 = new ProducerRecord(tp.topic, tp.partition, null,
"key".getBytes, "value".getBytes)
+ producer.send(record1)
+
+ consumer.assign(asList(tp))
+ val count = consumer.poll(Duration.ofMillis(5000)).count()
+ assertEquals(1, count)
+ }
+
+ private def ensureControllerIn(brokerIds: Seq[Int]): Unit = {
+ while (!brokerIds.contains(controllerSocketServer.config.brokerId)) {
+ zkClient.deleteController(ZkVersion.MatchAnyVersion)
+ TestUtils.waitUntilControllerElected(zkClient)
+ }
+ }
+
+ private def createConfig(nodeId: Int, interBrokerVersion: ApiVersion):
KafkaConfig = {
+ val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
+ props.put(KafkaConfig.InterBrokerProtocolVersionProp,
interBrokerVersion.version)
+ KafkaConfig.fromProps(props)
+ }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9f3776f..2b1a397 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3501,8 +3501,9 @@ class ReplicaManagerTest {
assertEquals(expectedTopicId, fetchState.get.topicId)
}
- @Test
- def testPartitionFetchStateUpdatesWithTopicIdAdded(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testPartitionFetchStateUpdatesWithTopicIdChanges(startsWithTopicId:
Boolean): Unit = {
val aliveBrokersIds = Seq(0, 1)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time),
brokerId = 0, aliveBrokersIds)
@@ -3510,18 +3511,23 @@ class ReplicaManagerTest {
val tp = new TopicPartition(topic, 0)
val leaderAndIsr = new LeaderAndIsr(1, 0, aliveBrokersIds.toList, 0)
- val leaderAndIsrRequest1 = leaderAndIsrRequest(Uuid.ZERO_UUID, tp,
aliveBrokersIds, leaderAndIsr)
+ // This test either starts with a topic ID in the PartitionFetchState
and removes it on the next request (startsWithTopicId)
+ // or does not start with a topic ID in the PartitionFetchState and adds
one on the next request (!startsWithTopicId)
+ val startingId = if (startsWithTopicId) topicId else Uuid.ZERO_UUID
+ val startingIdOpt = if (startsWithTopicId) Some(topicId) else None
+ val leaderAndIsrRequest1 = leaderAndIsrRequest(startingId, tp,
aliveBrokersIds, leaderAndIsr)
val leaderAndIsrResponse1 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest1, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse1.error)
- assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp, None)
+ assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp,
startingIdOpt)
- val leaderAndIsrRequest2 = leaderAndIsrRequest(topicId, tp,
aliveBrokersIds, leaderAndIsr)
+ val endingId = if (!startsWithTopicId) topicId else Uuid.ZERO_UUID
+ val endingIdOpt = if (!startsWithTopicId) Some(topicId) else None
+ val leaderAndIsrRequest2 = leaderAndIsrRequest(endingId, tp,
aliveBrokersIds, leaderAndIsr)
val leaderAndIsrResponse2 = replicaManager.becomeLeaderOrFollower(0,
leaderAndIsrRequest2, (_, _) => ())
assertEquals(Errors.NONE, leaderAndIsrResponse2.error)
- assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp,
Some(topicId))
-
+ assertFetcherHasTopicId(replicaManager.replicaFetcherManager, tp,
endingIdOpt)
} finally {
replicaManager.shutdown(checkpointHW = false)
}
@@ -3549,7 +3555,7 @@ class ReplicaManagerTest {
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ ==
partition.log.get.dir.getParentFile).size)
// Append a couple of messages.
- for (i <- 1 to 40) {
+ for (i <- 1 to 500) {
val records = TestUtils.singletonRecords(s"message $i".getBytes)
appendRecords(replicaManager, tp, records).onFire { response =>
assertEquals(Errors.NONE, response.error)