This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 47c5b001be1deea5a4542e52baf0738def36b430 Author: David Arthur <[email protected]> AuthorDate: Wed Dec 14 08:37:06 2022 -0500 MINOR: Change KRaft ZK controller registration algorithm (#12973) Increment the value in "/controller_epoch" when registering a KRaft controller as the active controller. Use the "kraftControllerEpoch" stored under "/controller" to ensure we are registering a newer KRaft controller. Reviewers: Colin P. McCabe <[email protected]> --- core/src/main/scala/kafka/zk/KafkaZkClient.scala | 102 +++++++++++++-------- core/src/main/scala/kafka/zk/ZkData.scala | 15 ++- .../main/scala/kafka/zk/ZkMigrationClient.scala | 9 +- .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 68 +++++++++++++- .../unit/kafka/zk/ZkMigrationClientTest.scala | 29 ++++-- 5 files changed, 166 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 115446572e1..361c74c5a7c 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -45,6 +45,10 @@ import org.apache.zookeeper.{CreateMode, KeeperException, OpResult, ZooKeeper} import scala.collection.{Map, Seq, mutable} +sealed trait KRaftRegistrationResult +case class FailedRegistrationResult() extends KRaftRegistrationResult +case class SuccessfulRegistrationResult(zkControllerEpoch: Int, controllerEpochZkVersion: Int) extends KRaftRegistrationResult + /** * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]]. * @@ -167,67 +171,76 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo * the migration. * * To ensure that the KRaft controller epoch exceeds the current ZK controller epoch, this registration algorithm - * uses a conditional update on the /controller_epoch znode. If a new ZK controller is elected during this method, - * the conditional update on /controller_epoch fails which causes the whole multi-op transaction to fail. + * uses a conditional update on the /controller and /controller_epoch znodes. + * + * If a new controller is registered concurrently with this registration, one of the two will fail the CAS + * operation on /controller_epoch. For KRaft, we have an extra guard against the registered KRaft epoch going + * backwards. If a KRaft controller had previously registered, an additional CAS operation is done on the /controller + * ZNode to ensure that the KRaft epoch being registered is newer. * * @param kraftControllerId ID of the KRaft controller node * @param kraftControllerEpoch Epoch of the KRaft controller node - * @return An optional of the new zkVersion of /controller_epoch. None if we could not register the KRaft controller. + * @return A result object containing the written ZK controller epoch and version, or nothing. */ - def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): Option[Int] = { + def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, kraftControllerEpoch: Int): KRaftRegistrationResult = { val timestamp = time.milliseconds() val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, e._2.getVersion)) - val controllerOpt = getControllerId - val controllerEpochToStore = kraftControllerEpoch + 10000000 // TODO Remove this after KAFKA-14436 + val controllerOpt = getControllerRegistration + + // If we have a KRaft epoch registered in /controller, and it is not _older_ than the requested epoch, throw an error. + controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk => + if (kraftEpochInZk >= kraftControllerEpoch) { + throw new ControllerMovedException(s"Cannot register KRaft controller $kraftControllerId with epoch $kraftControllerEpoch " + + s"as the current controller register in ZK has the same or newer epoch $kraftEpochInZk.") + } + } + curEpochOpt match { case None => throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " + s"since there is no ZK controller epoch present.") case Some((curEpoch: Int, curEpochZk: Int)) => - if (curEpoch >= controllerEpochToStore) { - // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK - throw new IllegalStateException(s"Cannot register KRaft controller $kraftControllerId as the active controller " + - s"in ZK since its epoch ${controllerEpochToStore} is not higher than the current ZK epoch ${curEpoch}.") - } - - val response = if (controllerOpt.isDefined) { - info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " + - s"controller with epoch $controllerEpochToStore. The previous controller was ${controllerOpt.get}.") - retryRequestUntilConnected( - MultiRequest(Seq( - SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk), - DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion), - CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp), - defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT))) - ) - } else { - info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active " + - s"controller with epoch $controllerEpochToStore. There was no active controller.") - retryRequestUntilConnected( - MultiRequest(Seq( - SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk), - CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp), - defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT))) - ) + val newControllerEpoch = curEpoch + 1 + + val response = controllerOpt match { + case Some(controller) => + info(s"KRaft controller $kraftControllerId overwriting ${ControllerZNode.path} to become the active " + + s"controller with ZK epoch $newControllerEpoch. The previous controller was ${controller.broker}.") + retryRequestUntilConnected( + MultiRequest(Seq( + SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), curEpochZk), + DeleteOp(ControllerZNode.path, controller.zkVersion), + CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp, kraftControllerEpoch), + defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT))) + ) + case None => + info(s"KRaft controller $kraftControllerId creating ${ControllerZNode.path} to become the active " + + s"controller with ZK epoch $newControllerEpoch. There was no active controller.") + retryRequestUntilConnected( + MultiRequest(Seq( + SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), curEpochZk), + CreateOp(ControllerZNode.path, ControllerZNode.encode(kraftControllerId, timestamp, kraftControllerEpoch), + defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT))) + ) } - val failureSuffix = s"while trying to register KRaft controller $kraftControllerId with epoch " + - s"$controllerEpochToStore. KRaft controller was not registered." + val failureSuffix = s"while trying to register KRaft controller $kraftControllerId with ZK epoch " + + s"$newControllerEpoch. KRaft controller was not registered." response.resultCode match { case Code.OK => - info(s"Successfully registered KRaft controller $kraftControllerId with epoch $controllerEpochToStore") + info(s"Successfully registered KRaft controller $kraftControllerId with ZK epoch $newControllerEpoch") // First op is always SetData on /controller_epoch val setDataResult = response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult] - Some(setDataResult.getStat.getVersion) + SuccessfulRegistrationResult(newControllerEpoch, setDataResult.getStat.getVersion) case Code.BADVERSION => - info(s"The controller epoch changed $failureSuffix") - None + info(s"The ZK controller epoch changed $failureSuffix") + FailedRegistrationResult() case Code.NONODE => info(s"The ephemeral node at ${ControllerZNode.path} went away $failureSuffix") - None + FailedRegistrationResult() case Code.NODEEXISTS => info(s"The ephemeral node at ${ControllerZNode.path} was created by another controller $failureSuffix") - None + FailedRegistrationResult() case code => error(s"ZooKeeper had an error $failureSuffix") throw KeeperException.create(code) @@ -1210,6 +1223,17 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo } } + + def getControllerRegistration: Option[ZKControllerRegistration] = { + val getDataRequest = GetDataRequest(ControllerZNode.path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + getDataResponse.resultCode match { + case Code.OK => Some(ControllerZNode.decodeController(getDataResponse.data, getDataResponse.stat.getVersion)) + case Code.NONODE => None + case _ => throw getDataResponse.resultException.get + } + } + /** * Deletes the controller znode. * @param expectedControllerEpochZkVersion expected controller epoch zkVersion. diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index 84b767d4d3b..b0337b90062 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -59,15 +59,28 @@ object ControllerZNode { def path = "/controller" def encode(brokerId: Int, timestamp: Long, kraftControllerEpoch: Int = -1): Array[Byte] = { Json.encodeAsBytes(Map( - "version" -> 2, "brokerid" -> brokerId, + "version" -> 2, + "brokerid" -> brokerId, "timestamp" -> timestamp.toString, "kraftControllerEpoch" -> kraftControllerEpoch).asJava) } def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js => js.asJsonObject("brokerid").to[Int] } + def decodeController(bytes: Array[Byte], zkVersion: Int): ZKControllerRegistration = Json.tryParseBytes(bytes) match { + case Right(json) => + val controller = json.asJsonObject + val brokerId = controller("brokerid").to[Int] + val kraftControllerEpoch = controller.get("kraftControllerEpoch").map(j => j.to[Int]) + ZKControllerRegistration(brokerId, kraftControllerEpoch, zkVersion) + + case Left(err) => + throw new KafkaException(s"Failed to parse ZooKeeper registration for controller: ${new String(bytes, UTF_8)}", err) + } } +case class ZKControllerRegistration(broker: Int, kraftEpoch: Option[Int], zkVersion: Int) + object ControllerEpochZNode { def path = "/controller_epoch" def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8) diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala index 77f46b9c794..017f773ee21 100644 --- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala +++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala @@ -53,12 +53,9 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends MigrationClient with Lo } override def claimControllerLeadership(state: ZkMigrationLeadershipState): ZkMigrationLeadershipState = { - val epochZkVersionOpt = zkClient.tryRegisterKRaftControllerAsActiveController( - state.kraftControllerId(), state.kraftControllerEpoch()) - if (epochZkVersionOpt.isDefined) { - state.withControllerZkVersion(epochZkVersionOpt.get) - } else { - state.withControllerZkVersion(-1) + zkClient.tryRegisterKRaftControllerAsActiveController(state.kraftControllerId(), state.kraftControllerEpoch()) match { + case SuccessfulRegistrationResult(_, controllerEpochZkVersion) => state.withControllerZkVersion(controllerEpochZkVersion) + case FailedRegistrationResult() => state.withControllerZkVersion(-1) } } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 11eae3386f8..416abd23eb2 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -17,7 +17,7 @@ package kafka.zk import java.nio.charset.StandardCharsets.UTF_8 -import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.{Collections, Properties} import kafka.api.LeaderAndIsr import kafka.cluster.{Broker, EndPoint} @@ -1199,6 +1199,72 @@ class KafkaZkClientTest extends QuorumTestHarness { "Updating with wrong ZK version returns BADVERSION") } + @Test + def testRegisterZkControllerAfterKRaft(): Unit = { + // Register KRaft + var controllerEpochZkVersion = -1 + zkClient.tryRegisterKRaftControllerAsActiveController(3000, 42) match { + case SuccessfulRegistrationResult(kraftEpoch, zkVersion) => + assertEquals(2, kraftEpoch) + controllerEpochZkVersion = zkVersion + case FailedRegistrationResult() => fail("Expected to register KRaft as controller in ZK") + } + assertEquals(1, controllerEpochZkVersion) + + // Can't register ZK anymore + assertThrows(classOf[ControllerMovedException], () => zkClient.registerControllerAndIncrementControllerEpoch(1)) + + // Delete controller, and try again + zkClient.deleteController(controllerEpochZkVersion) + val (newEpoch, newZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(1) + assertEquals(3, newEpoch) + assertEquals(2, newZkVersion) + + zkClient.tryRegisterKRaftControllerAsActiveController(3000, 42) match { + case SuccessfulRegistrationResult(zkEpoch, zkVersion) => + assertEquals(4, zkEpoch) + assertEquals(3, zkVersion) + case FailedRegistrationResult() => fail("Expected to register KRaft as controller in ZK") + } + } + + @Test + def testConcurrentKRaftControllerClaim(): Unit = { + // Setup three threads to race on registering a KRaft controller in ZK + val registeredEpochs = new java.util.concurrent.ConcurrentLinkedQueue[Integer]() + val registeringNodes = new java.util.concurrent.ConcurrentHashMap[Integer, Integer]() + + def newThread(nodeId: Int): Runnable = { + () => { + 0.to(999).foreach(epoch => + zkClient.tryRegisterKRaftControllerAsActiveController(nodeId, epoch) match { + case SuccessfulRegistrationResult(writtenEpoch, _) => + registeredEpochs.add(writtenEpoch) + registeringNodes.compute(nodeId, (_, count) => if (count == null) { + 0 + } else { + count + 1 + }) + case FailedRegistrationResult() => + } + ) + } + } + val thread1 = newThread(1) + val thread2 = newThread(2) + val thread3 = newThread(3) + val executor = Executors.newFixedThreadPool(3) + executor.submit(thread1) + executor.submit(thread2) + executor.submit(thread3) + executor.shutdown() + executor.awaitTermination(30, TimeUnit.SECONDS) + + assertEquals(1000, registeredEpochs.size()) + val uniqueEpochs = registeredEpochs.asScala.toSet + assertEquals(1000, uniqueEpochs.size) + } + @Test def testControllerManagementMethods(): Unit = { // No controller diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala index 7fae24f650e..a8493d027d5 100644 --- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala @@ -42,6 +42,9 @@ import scala.jdk.CollectionConverters._ */ class ZkMigrationClientTest extends QuorumTestHarness { + private val InitialControllerEpoch: Int = 42 + private val InitialKRaftEpoch: Int = 0 + private var migrationClient: ZkMigrationClient = _ private var migrationState: ZkMigrationLeadershipState = _ @@ -58,7 +61,7 @@ class ZkMigrationClientTest extends QuorumTestHarness { private def initialMigrationState: ZkMigrationLeadershipState = { val (_, stat) = zkClient.getControllerEpoch.get - new ZkMigrationLeadershipState(3000, 42, 100, 42, Time.SYSTEM.milliseconds(), -1, stat.getVersion) + new ZkMigrationLeadershipState(3000, InitialControllerEpoch, 100, InitialKRaftEpoch, Time.SYSTEM.milliseconds(), -1, stat.getVersion) } @Test @@ -236,16 +239,22 @@ class ZkMigrationClientTest extends QuorumTestHarness { def testNonIncreasingKRaftEpoch(): Unit = { assertEquals(0, migrationState.migrationZkVersion()) + migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch) migrationState = migrationClient.claimControllerLeadership(migrationState) assertEquals(1, migrationState.controllerZkVersion()) - migrationState = migrationState.withNewKRaftController(3000, 40) - val t1 = assertThrows(classOf[IllegalStateException], () => migrationClient.claimControllerLeadership(migrationState)) - assertEquals("Cannot register KRaft controller 3000 as the active controller in ZK since its epoch 10000040 is not higher than the current ZK epoch 10000042.", t1.getMessage) + migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch - 1) + val t1 = assertThrows(classOf[ControllerMovedException], () => migrationClient.claimControllerLeadership(migrationState)) + assertEquals("Cannot register KRaft controller 3001 with epoch 41 as the current controller register in ZK has the same or newer epoch 42.", t1.getMessage) + + migrationState = migrationState.withNewKRaftController(3001, InitialControllerEpoch) + val t2 = assertThrows(classOf[ControllerMovedException], () => migrationClient.claimControllerLeadership(migrationState)) + assertEquals("Cannot register KRaft controller 3001 with epoch 42 as the current controller register in ZK has the same or newer epoch 42.", t2.getMessage) - migrationState = migrationState.withNewKRaftController(3000, 42) - val t2 = assertThrows(classOf[IllegalStateException], () => migrationClient.claimControllerLeadership(migrationState)) - assertEquals("Cannot register KRaft controller 3000 as the active controller in ZK since its epoch 10000042 is not higher than the current ZK epoch 10000042.", t2.getMessage) + migrationState = migrationState.withNewKRaftController(3001, 100) + migrationState = migrationClient.claimControllerLeadership(migrationState) + assertEquals(migrationState.kraftControllerEpoch(), 100) + assertEquals(migrationState.kraftControllerId(), 3001) } @Test @@ -259,8 +268,8 @@ class ZkMigrationClientTest extends QuorumTestHarness { migrationState = migrationClient.claimControllerLeadership(migrationState) assertEquals(2, migrationState.controllerZkVersion()) zkClient.getControllerEpoch match { - case Some((kraftEpoch, stat)) => - assertEquals(10000042, kraftEpoch) + case Some((zkEpoch, stat)) => + assertEquals(3, zkEpoch) assertEquals(2, stat.getVersion) case None => fail() } @@ -269,7 +278,7 @@ class ZkMigrationClientTest extends QuorumTestHarness { migrationState = migrationClient.releaseControllerLeadership(migrationState) val (epoch1, zkVersion1) = zkClient.registerControllerAndIncrementControllerEpoch(100) - assertEquals(epoch1, 10000043) + assertEquals(epoch1, 4) assertEquals(zkVersion1, 3) }
