This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 47c5b001be1deea5a4542e52baf0738def36b430
Author: David Arthur <[email protected]>
AuthorDate: Wed Dec 14 08:37:06 2022 -0500

    MINOR: Change KRaft ZK controller registration algorithm (#12973)
    
    Increment the value in "/controller_epoch" when registering a KRaft 
controller as the active controller. Use the "kraftControllerEpoch" stored 
under "/controller" to ensure we are registering a newer KRaft controller.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 core/src/main/scala/kafka/zk/KafkaZkClient.scala   | 102 +++++++++++++--------
 core/src/main/scala/kafka/zk/ZkData.scala          |  15 ++-
 .../main/scala/kafka/zk/ZkMigrationClient.scala    |   9 +-
 .../scala/unit/kafka/zk/KafkaZkClientTest.scala    |  68 +++++++++++++-
 .../unit/kafka/zk/ZkMigrationClientTest.scala      |  29 ++++--
 5 files changed, 166 insertions(+), 57 deletions(-)

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 115446572e1..361c74c5a7c 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -45,6 +45,10 @@ import org.apache.zookeeper.{CreateMode, KeeperException, 
OpResult, ZooKeeper}
 
 import scala.collection.{Map, Seq, mutable}
 
+sealed trait KRaftRegistrationResult
+case class FailedRegistrationResult() extends KRaftRegistrationResult
+case class SuccessfulRegistrationResult(zkControllerEpoch: Int, 
controllerEpochZkVersion: Int) extends KRaftRegistrationResult
+
 /**
  * Provides higher level Kafka-specific operations on top of the pipelined 
[[kafka.zookeeper.ZooKeeperClient]].
  *
@@ -167,67 +171,76 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
    * the migration.
    *
    * To ensure that the KRaft controller epoch exceeds the current ZK 
controller epoch, this registration algorithm
-   * uses a conditional update on the /controller_epoch znode. If a new ZK 
controller is elected during this method,
-   * the conditional update on /controller_epoch fails which causes the whole 
multi-op transaction to fail.
+   * uses a conditional update on the /controller and /controller_epoch znodes.
+   *
+   * If a new controller is registered concurrently with this registration, 
one of the two will fail the CAS
+   * operation on /controller_epoch. For KRaft, we have an extra guard against 
the registered KRaft epoch going
+   * backwards. If a KRaft controller had previously registered, an additional 
CAS operation is done on the /controller
+   * ZNode to ensure that the KRaft epoch being registered is newer.
    *
    * @param kraftControllerId ID of the KRaft controller node
    * @param kraftControllerEpoch Epoch of the KRaft controller node
-   * @return An optional of the new zkVersion of /controller_epoch. None if we 
could not register the KRaft controller.
+   * @return A result object containing the written ZK controller epoch and 
version, or nothing.
    */
-  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, 
kraftControllerEpoch: Int): Option[Int] = {
+  def tryRegisterKRaftControllerAsActiveController(kraftControllerId: Int, 
kraftControllerEpoch: Int): KRaftRegistrationResult = {
     val timestamp = time.milliseconds()
     val curEpochOpt: Option[(Int, Int)] = getControllerEpoch.map(e => (e._1, 
e._2.getVersion))
-    val controllerOpt = getControllerId
-    val controllerEpochToStore = kraftControllerEpoch + 10000000 // TODO 
Remove this after KAFKA-14436
+    val controllerOpt = getControllerRegistration
+
+    // If we have a KRaft epoch registered in /controller, and it is not 
_older_ than the requested epoch, throw an error.
+    controllerOpt.flatMap(_.kraftEpoch).foreach { kraftEpochInZk =>
+      if (kraftEpochInZk >= kraftControllerEpoch) {
+        throw new ControllerMovedException(s"Cannot register KRaft controller 
$kraftControllerId with epoch $kraftControllerEpoch " +
+          s"as the current controller register in ZK has the same or newer 
epoch $kraftEpochInZk.")
+      }
+    }
+
     curEpochOpt match {
       case None =>
         throw new IllegalStateException(s"Cannot register KRaft controller 
$kraftControllerId as the active controller " +
           s"since there is no ZK controller epoch present.")
       case Some((curEpoch: Int, curEpochZk: Int)) =>
-        if (curEpoch >= controllerEpochToStore) {
-          // TODO KAFKA-14436 Need to ensure KRaft has a higher epoch an ZK
-          throw new IllegalStateException(s"Cannot register KRaft controller 
$kraftControllerId as the active controller " +
-            s"in ZK since its epoch ${controllerEpochToStore} is not higher 
than the current ZK epoch ${curEpoch}.")
-        }
-
-        val response = if (controllerOpt.isDefined) {
-          info(s"KRaft controller $kraftControllerId overwriting 
${ControllerZNode.path} to become the active " +
-            s"controller with epoch $controllerEpochToStore. The previous 
controller was ${controllerOpt.get}.")
-          retryRequestUntilConnected(
-            MultiRequest(Seq(
-              SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
-              DeleteOp(ControllerZNode.path, ZkVersion.MatchAnyVersion),
-              CreateOp(ControllerZNode.path, 
ControllerZNode.encode(kraftControllerId, timestamp),
-                defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
-          )
-        } else {
-          info(s"KRaft controller $kraftControllerId creating 
${ControllerZNode.path} to become the active " +
-            s"controller with epoch $controllerEpochToStore. There was no 
active controller.")
-          retryRequestUntilConnected(
-            MultiRequest(Seq(
-              SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(controllerEpochToStore), curEpochZk),
-              CreateOp(ControllerZNode.path, 
ControllerZNode.encode(kraftControllerId, timestamp),
-                defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
-          )
+        val newControllerEpoch = curEpoch + 1
+
+        val response = controllerOpt match {
+          case Some(controller) =>
+            info(s"KRaft controller $kraftControllerId overwriting 
${ControllerZNode.path} to become the active " +
+              s"controller with ZK epoch $newControllerEpoch. The previous 
controller was ${controller.broker}.")
+            retryRequestUntilConnected(
+              MultiRequest(Seq(
+                SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
+                DeleteOp(ControllerZNode.path, controller.zkVersion),
+                CreateOp(ControllerZNode.path, 
ControllerZNode.encode(kraftControllerId, timestamp, kraftControllerEpoch),
+                  defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
+            )
+          case None =>
+            info(s"KRaft controller $kraftControllerId creating 
${ControllerZNode.path} to become the active " +
+              s"controller with ZK epoch $newControllerEpoch. There was no 
active controller.")
+            retryRequestUntilConnected(
+              MultiRequest(Seq(
+                SetDataOp(ControllerEpochZNode.path, 
ControllerEpochZNode.encode(newControllerEpoch), curEpochZk),
+                CreateOp(ControllerZNode.path, 
ControllerZNode.encode(kraftControllerId, timestamp, kraftControllerEpoch),
+                  defaultAcls(ControllerZNode.path), CreateMode.PERSISTENT)))
+            )
         }
 
-        val failureSuffix = s"while trying to register KRaft controller 
$kraftControllerId with epoch " +
-          s"$controllerEpochToStore. KRaft controller was not registered."
+        val failureSuffix = s"while trying to register KRaft controller 
$kraftControllerId with ZK epoch " +
+          s"$newControllerEpoch. KRaft controller was not registered."
         response.resultCode match {
           case Code.OK =>
-            info(s"Successfully registered KRaft controller $kraftControllerId 
with epoch $controllerEpochToStore")
+            info(s"Successfully registered KRaft controller $kraftControllerId 
with ZK epoch $newControllerEpoch")
             // First op is always SetData on /controller_epoch
             val setDataResult = 
response.zkOpResults(0).rawOpResult.asInstanceOf[SetDataResult]
-            Some(setDataResult.getStat.getVersion)
+            SuccessfulRegistrationResult(newControllerEpoch, 
setDataResult.getStat.getVersion)
           case Code.BADVERSION =>
-            info(s"The controller epoch changed $failureSuffix")
-            None
+            info(s"The ZK controller epoch changed $failureSuffix")
+            FailedRegistrationResult()
           case Code.NONODE =>
             info(s"The ephemeral node at ${ControllerZNode.path} went away 
$failureSuffix")
-            None
+            FailedRegistrationResult()
           case Code.NODEEXISTS =>
             info(s"The ephemeral node at ${ControllerZNode.path} was created 
by another controller $failureSuffix")
-            None
+            FailedRegistrationResult()
           case code =>
             error(s"ZooKeeper had an error $failureSuffix")
             throw KeeperException.create(code)
@@ -1210,6 +1223,17 @@ class KafkaZkClient private[zk] (zooKeeperClient: 
ZooKeeperClient, isSecure: Boo
     }
   }
 
+
+  def getControllerRegistration: Option[ZKControllerRegistration] = {
+    val getDataRequest = GetDataRequest(ControllerZNode.path)
+    val getDataResponse = retryRequestUntilConnected(getDataRequest)
+    getDataResponse.resultCode match {
+      case Code.OK => 
Some(ControllerZNode.decodeController(getDataResponse.data, 
getDataResponse.stat.getVersion))
+      case Code.NONODE => None
+      case _ => throw getDataResponse.resultException.get
+    }
+  }
+
   /**
    * Deletes the controller znode.
    * @param expectedControllerEpochZkVersion expected controller epoch 
zkVersion.
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala 
b/core/src/main/scala/kafka/zk/ZkData.scala
index 84b767d4d3b..b0337b90062 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -59,15 +59,28 @@ object ControllerZNode {
   def path = "/controller"
   def encode(brokerId: Int, timestamp: Long, kraftControllerEpoch: Int = -1): 
Array[Byte] = {
     Json.encodeAsBytes(Map(
-      "version" -> 2, "brokerid" -> brokerId,
+      "version" -> 2,
+      "brokerid" -> brokerId,
       "timestamp" -> timestamp.toString,
       "kraftControllerEpoch" -> kraftControllerEpoch).asJava)
   }
   def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { 
js =>
     js.asJsonObject("brokerid").to[Int]
   }
+  def decodeController(bytes: Array[Byte], zkVersion: Int): 
ZKControllerRegistration = Json.tryParseBytes(bytes) match {
+    case Right(json) =>
+      val controller = json.asJsonObject
+      val brokerId = controller("brokerid").to[Int]
+      val kraftControllerEpoch = controller.get("kraftControllerEpoch").map(j 
=> j.to[Int])
+      ZKControllerRegistration(brokerId, kraftControllerEpoch, zkVersion)
+
+    case Left(err) =>
+      throw new KafkaException(s"Failed to parse ZooKeeper registration for 
controller: ${new String(bytes, UTF_8)}", err)
+  }
 }
 
+case class ZKControllerRegistration(broker: Int, kraftEpoch: Option[Int], 
zkVersion: Int)
+
 object ControllerEpochZNode {
   def path = "/controller_epoch"
   def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8)
diff --git a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala 
b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
index 77f46b9c794..017f773ee21 100644
--- a/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
+++ b/core/src/main/scala/kafka/zk/ZkMigrationClient.scala
@@ -53,12 +53,9 @@ class ZkMigrationClient(zkClient: KafkaZkClient) extends 
MigrationClient with Lo
   }
 
   override def claimControllerLeadership(state: ZkMigrationLeadershipState): 
ZkMigrationLeadershipState = {
-    val epochZkVersionOpt = 
zkClient.tryRegisterKRaftControllerAsActiveController(
-      state.kraftControllerId(), state.kraftControllerEpoch())
-    if (epochZkVersionOpt.isDefined) {
-      state.withControllerZkVersion(epochZkVersionOpt.get)
-    } else {
-      state.withControllerZkVersion(-1)
+    
zkClient.tryRegisterKRaftControllerAsActiveController(state.kraftControllerId(),
 state.kraftControllerEpoch()) match {
+      case SuccessfulRegistrationResult(_, controllerEpochZkVersion) => 
state.withControllerZkVersion(controllerEpochZkVersion)
+      case FailedRegistrationResult() => state.withControllerZkVersion(-1)
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 11eae3386f8..416abd23eb2 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -17,7 +17,7 @@
 package kafka.zk
 
 import java.nio.charset.StandardCharsets.UTF_8
-import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.{Collections, Properties}
 import kafka.api.LeaderAndIsr
 import kafka.cluster.{Broker, EndPoint}
@@ -1199,6 +1199,72 @@ class KafkaZkClientTest extends QuorumTestHarness {
       "Updating with wrong ZK version returns BADVERSION")
   }
 
+  @Test
+  def testRegisterZkControllerAfterKRaft(): Unit = {
+    // Register KRaft
+    var controllerEpochZkVersion = -1
+    zkClient.tryRegisterKRaftControllerAsActiveController(3000, 42) match {
+      case SuccessfulRegistrationResult(kraftEpoch, zkVersion) =>
+        assertEquals(2, kraftEpoch)
+        controllerEpochZkVersion = zkVersion
+      case FailedRegistrationResult() => fail("Expected to register KRaft as 
controller in ZK")
+    }
+    assertEquals(1, controllerEpochZkVersion)
+
+    // Can't register ZK anymore
+    assertThrows(classOf[ControllerMovedException], () => 
zkClient.registerControllerAndIncrementControllerEpoch(1))
+
+    // Delete controller, and try again
+    zkClient.deleteController(controllerEpochZkVersion)
+    val (newEpoch, newZkVersion) = 
zkClient.registerControllerAndIncrementControllerEpoch(1)
+    assertEquals(3, newEpoch)
+    assertEquals(2, newZkVersion)
+
+    zkClient.tryRegisterKRaftControllerAsActiveController(3000, 42) match {
+      case SuccessfulRegistrationResult(zkEpoch, zkVersion) =>
+        assertEquals(4, zkEpoch)
+        assertEquals(3, zkVersion)
+      case FailedRegistrationResult() => fail("Expected to register KRaft as 
controller in ZK")
+    }
+  }
+
+  @Test
+  def testConcurrentKRaftControllerClaim(): Unit = {
+    // Setup three threads to race on registering a KRaft controller in ZK
+    val registeredEpochs = new 
java.util.concurrent.ConcurrentLinkedQueue[Integer]()
+    val registeringNodes = new java.util.concurrent.ConcurrentHashMap[Integer, 
Integer]()
+
+    def newThread(nodeId: Int): Runnable = {
+      () => {
+        0.to(999).foreach(epoch =>
+          zkClient.tryRegisterKRaftControllerAsActiveController(nodeId, epoch) 
match {
+            case SuccessfulRegistrationResult(writtenEpoch, _) =>
+              registeredEpochs.add(writtenEpoch)
+              registeringNodes.compute(nodeId, (_, count) => if (count == 
null) {
+                0
+              } else {
+                count + 1
+              })
+            case FailedRegistrationResult() =>
+          }
+        )
+      }
+    }
+    val thread1 = newThread(1)
+    val thread2 = newThread(2)
+    val thread3 = newThread(3)
+    val executor = Executors.newFixedThreadPool(3)
+    executor.submit(thread1)
+    executor.submit(thread2)
+    executor.submit(thread3)
+    executor.shutdown()
+    executor.awaitTermination(30, TimeUnit.SECONDS)
+
+    assertEquals(1000, registeredEpochs.size())
+    val uniqueEpochs = registeredEpochs.asScala.toSet
+    assertEquals(1000, uniqueEpochs.size)
+  }
+
   @Test
   def testControllerManagementMethods(): Unit = {
     // No controller
diff --git a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
index 7fae24f650e..a8493d027d5 100644
--- a/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZkMigrationClientTest.scala
@@ -42,6 +42,9 @@ import scala.jdk.CollectionConverters._
  */
 class ZkMigrationClientTest extends QuorumTestHarness {
 
+  private val InitialControllerEpoch: Int = 42
+  private val InitialKRaftEpoch: Int = 0
+
   private var migrationClient: ZkMigrationClient = _
 
   private var migrationState: ZkMigrationLeadershipState = _
@@ -58,7 +61,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
 
   private def initialMigrationState: ZkMigrationLeadershipState = {
     val (_, stat) = zkClient.getControllerEpoch.get
-    new ZkMigrationLeadershipState(3000, 42, 100, 42, 
Time.SYSTEM.milliseconds(), -1, stat.getVersion)
+    new ZkMigrationLeadershipState(3000, InitialControllerEpoch, 100, 
InitialKRaftEpoch, Time.SYSTEM.milliseconds(), -1, stat.getVersion)
   }
 
   @Test
@@ -236,16 +239,22 @@ class ZkMigrationClientTest extends QuorumTestHarness {
   def testNonIncreasingKRaftEpoch(): Unit = {
     assertEquals(0, migrationState.migrationZkVersion())
 
+    migrationState = migrationState.withNewKRaftController(3001, 
InitialControllerEpoch)
     migrationState = migrationClient.claimControllerLeadership(migrationState)
     assertEquals(1, migrationState.controllerZkVersion())
 
-    migrationState = migrationState.withNewKRaftController(3000, 40)
-    val t1 = assertThrows(classOf[IllegalStateException], () => 
migrationClient.claimControllerLeadership(migrationState))
-    assertEquals("Cannot register KRaft controller 3000 as the active 
controller in ZK since its epoch 10000040 is not higher than the current ZK 
epoch 10000042.", t1.getMessage)
+    migrationState = migrationState.withNewKRaftController(3001, 
InitialControllerEpoch - 1)
+    val t1 = assertThrows(classOf[ControllerMovedException], () => 
migrationClient.claimControllerLeadership(migrationState))
+    assertEquals("Cannot register KRaft controller 3001 with epoch 41 as the 
current controller register in ZK has the same or newer epoch 42.", 
t1.getMessage)
+
+    migrationState = migrationState.withNewKRaftController(3001, 
InitialControllerEpoch)
+    val t2 = assertThrows(classOf[ControllerMovedException], () => 
migrationClient.claimControllerLeadership(migrationState))
+    assertEquals("Cannot register KRaft controller 3001 with epoch 42 as the 
current controller register in ZK has the same or newer epoch 42.", 
t2.getMessage)
 
-    migrationState = migrationState.withNewKRaftController(3000, 42)
-    val t2 = assertThrows(classOf[IllegalStateException], () => 
migrationClient.claimControllerLeadership(migrationState))
-    assertEquals("Cannot register KRaft controller 3000 as the active 
controller in ZK since its epoch 10000042 is not higher than the current ZK 
epoch 10000042.", t2.getMessage)
+    migrationState = migrationState.withNewKRaftController(3001, 100)
+    migrationState = migrationClient.claimControllerLeadership(migrationState)
+    assertEquals(migrationState.kraftControllerEpoch(), 100)
+    assertEquals(migrationState.kraftControllerId(), 3001)
   }
 
   @Test
@@ -259,8 +268,8 @@ class ZkMigrationClientTest extends QuorumTestHarness {
     migrationState = migrationClient.claimControllerLeadership(migrationState)
     assertEquals(2, migrationState.controllerZkVersion())
     zkClient.getControllerEpoch match {
-      case Some((kraftEpoch, stat)) =>
-        assertEquals(10000042, kraftEpoch)
+      case Some((zkEpoch, stat)) =>
+        assertEquals(3, zkEpoch)
         assertEquals(2, stat.getVersion)
       case None => fail()
     }
@@ -269,7 +278,7 @@ class ZkMigrationClientTest extends QuorumTestHarness {
 
     migrationState = 
migrationClient.releaseControllerLeadership(migrationState)
     val (epoch1, zkVersion1) = 
zkClient.registerControllerAndIncrementControllerEpoch(100)
-    assertEquals(epoch1, 10000043)
+    assertEquals(epoch1, 4)
     assertEquals(zkVersion1, 3)
   }
 

Reply via email to