soarez commented on code in PR #15918:
URL: https://github.com/apache/kafka/pull/15918#discussion_r1606801722


##########
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##########
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
       if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
     }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+    val zookeeper = new EmbeddedZookeeper()
+    var zkClient: KafkaZkClient = null
+    val zkConnect = s"127.0.0.1:${zookeeper.port}"
+    try {
+      zkClient = KafkaZkClient(
+        zkConnect,
+        isSecure = false,
+        30000,
+        60000,
+        1,
+        Time.SYSTEM,
+        name = "ZkMigrationFailoverTest",
+        new ZKClientConfig)
+    } catch {
+      case t: Throwable =>
+        Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+        zookeeper.shutdown()

Review Comment:
   I don't think this makes sense?
   
   A call to `.close()` in `EmbeddedZookeeper` delegates to `.shutdown()`.  In 
addition to a direct call to `.close()`, `Utils.closeQuietly()` checks for null 
and logs any exception in a catch-all. 
   If we want the exception to be thrown, `.shutdown()` is enough and 
`closeQuietly` can be removed, if instead we want to catch and log a possible 
exception, then `.shutdown()` needs to be removed.



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##########
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
       if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
     }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+    val zookeeper = new EmbeddedZookeeper()
+    var zkClient: KafkaZkClient = null
+    val zkConnect = s"127.0.0.1:${zookeeper.port}"
+    try {
+      zkClient = KafkaZkClient(
+        zkConnect,
+        isSecure = false,
+        30000,
+        60000,
+        1,
+        Time.SYSTEM,
+        name = "ZkMigrationFailoverTest",
+        new ZKClientConfig)
+    } catch {
+      case t: Throwable =>
+        Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+        zookeeper.shutdown()
+        if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
+        throw t
+    }
+
+    val topicClient1 = new CapturingZkTopicMigrationClient(zkClient)
+    val topicClient2 = new CapturingZkTopicMigrationClient(zkClient)
+
+    def buildZkMigrationClient(topicClient: TopicMigrationClient): 
ZkMigrationClient = {
+      val configClient = new ZkConfigMigrationClient(zkClient, 
PasswordEncoder.NOOP)
+      val aclClient = new ZkAclMigrationClient(zkClient)
+      val delegationTokenClient = new 
ZkDelegationTokenMigrationClient(zkClient)
+      new ZkMigrationClient(zkClient, topicClient, configClient, aclClient, 
delegationTokenClient)
+    }
+
+    val zkMigrationClient1 = buildZkMigrationClient(topicClient1)
+    val zkMigrationClient2 = buildZkMigrationClient(topicClient2)
+
+    val (driver1, faultHandler1) = buildMigrationDriver(3000, 
zkMigrationClient1)
+    val (driver2, faultHandler2) = buildMigrationDriver(3001, 
zkMigrationClient2)
+
+    // Initialize data into /controller and /controller_epoch
+    zkClient.registerControllerAndIncrementControllerEpoch(0)
+    var zkState = zkMigrationClient1.claimControllerLeadership(
+      ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1)
+    )
+
+    // Fake a complete migration
+    zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10)
+    zkState = zkMigrationClient1.getOrCreateMigrationRecoveryState(zkState)
+
+    try {
+      driver1.start()
+      driver2.start()
+
+      val leader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2)
+      var image = MetadataImage.EMPTY
+      val delta = new MetadataDelta(image)
+      delta.replay(new FeatureLevelRecord()
+        .setName(MetadataVersion.FEATURE_NAME)
+        .setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel))
+      delta.replay(ZkMigrationState.MIGRATION.toRecord.message)
+
+      val provenance = new MetadataProvenance(210, 11, 1)
+      image = delta.apply(provenance)
+
+      val manifest = LogDeltaManifest.newBuilder()
+        .provenance(provenance)
+        .leaderAndEpoch(leader1)
+        .numBatches(1)
+        .elapsedNs(100)
+        .numBytes(42)
+        .build()
+
+      // Load an image into 3000 image and a leader event, this lets it become 
active and sync to ZK
+      driver1.onMetadataUpdate(delta, image, manifest)
+      driver1.onControllerChange(leader1)
+      driver2.onControllerChange(leader1)
+
+      // Wait until new leader has sync'd to ZK
+      TestUtils.waitUntilTrue(
+        () => 
safeGet(driver1.migrationState()).equals(MigrationDriverState.DUAL_WRITE),
+        "waiting for driver to enter DUAL_WRITE"
+      )
+
+      // Enqueue several metadata deltas and then process a leader change.
+      for (i <- 1 to 1000) {
+        val delta = new MetadataDelta(image)
+        delta.replay(new 
TopicRecord().setTopicId(Uuid.randomUuid()).setName(s"topic-$i"))
+        val provenance = new MetadataProvenance(210 + i, 11, 1)
+        image = delta.apply(provenance)
+        val manifest = LogDeltaManifest.newBuilder()
+          .provenance(provenance)
+          .leaderAndEpoch(leader1)
+          .numBatches(1)
+          .elapsedNs(100)
+          .numBytes(42)
+          .build()
+        driver1.onMetadataUpdate(delta, image, manifest)
+        driver2.onMetadataUpdate(delta, image, manifest)
+      }
+      Thread.sleep(50) // Give some events a chance to run
+
+      val leader2 = new LeaderAndEpoch(OptionalInt.of(3001), 3)
+      driver1.onControllerChange(leader2)
+
+      Thread.sleep(50) // Artificial delay for 3001 to see the leader change.
+      driver2.onControllerChange(leader2)
+
+      // Wait for driver 2 to become leader in ZK
+      TestUtils.waitUntilTrue(() => zkClient.getControllerId match {
+        case Some(nodeId) => nodeId == 3001
+        case None => false
+      }, "waiting for 3001 to claim ZK leadership")
+
+      TestUtils.waitUntilTrue(() => {
+        val topics = zkClient.getAllTopicsInCluster(false)
+        topics.size == 1000
+      }, "waiting for topics to be created in ZK.")
+
+      assertTrue(topicClient1.createdTopics.nonEmpty, "Expect first leader to 
write some topics")
+      assertTrue(topicClient1.createdTopics.nonEmpty, "Expect second leader to 
write some topics")

Review Comment:
   ```suggestion
         assertTrue(topicClient2.createdTopics.nonEmpty, "Expect second leader 
to write some topics")
   ```



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -619,34 +679,45 @@ public void run() throws Exception {
         }
     }
 
-    class BecomeZkControllerEvent extends MigrationEvent {
+    class BecomeZkControllerEvent extends MigrationEvent implements 
MigrationWriteEvent {
+        private final LeaderAndEpoch leaderAndEpoch;
+
+        BecomeZkControllerEvent(LeaderAndEpoch leaderAndEpoch) {
+            this.leaderAndEpoch = leaderAndEpoch;
+        }
+
         @Override
         public void run() throws Exception {
+            // The leader epoch check in checkDriverState prevents us from 
getting stuck retrying this event after a
+            // new leader has been seen.
             if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, 
this)) {
-                applyMigrationOperation("Claiming ZK controller leadership", 
zkMigrationClient::claimControllerLeadership);
+                applyMigrationOperation("Claimed ZK controller leadership", 
zkMigrationClient::claimControllerLeadership, true);
                 if (migrationLeadershipState.zkControllerEpochZkVersion() == 
ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION) {
                     log.info("Unable to claim leadership, will retry until we 
learn of a different KRaft leader");
+                    return; // Stay in BECOME_CONTROLLER state and retry
+                }
+
+                // KAFKA-16171 and KAFKA-16667: Prior writing to /controller 
and /controller_epoch ZNodes above,
+                // the previous controller could have modified the /migration 
ZNode. Since ZK does grant us linearizability
+                // between writes and reads on different ZNodes, we need to 
write something to the /migration ZNode to
+                // ensure we have the latest /migration zkVersion.
+                applyMigrationOperation("Updated migration state", state -> {
+                    state = state.withMigrationZkVersion(-1); // Causes an 
unconditional update on /migration

Review Comment:
   Suggestion, as it's not obvious how this leads to the unconditional update.
   
   ```suggestion
                       state = state.withMigrationZkVersion(-1); // Causes an 
unconditional update on /migration via KafkaZkClient#retryRequestsUntilConnected
   ```
   



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -491,6 +536,13 @@ public void run() throws Exception {
                 return;
             }
 
+            if (!curLeaderAndEpoch.equals(leaderAndEpoch)) {

Review Comment:
   Are any of the previous actions/checks in this method worth running if the 
epoch is stale? Why not make also add `MetadataChangeEvent` to the set of 
events that should be skipped in this situation? Might need a different name 
than  `MigrationWriteEvent`, but does the generalization make sense?



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##########
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
       if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
     }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+    val zookeeper = new EmbeddedZookeeper()
+    var zkClient: KafkaZkClient = null
+    val zkConnect = s"127.0.0.1:${zookeeper.port}"
+    try {
+      zkClient = KafkaZkClient(
+        zkConnect,
+        isSecure = false,
+        30000,
+        60000,
+        1,
+        Time.SYSTEM,
+        name = "ZkMigrationFailoverTest",
+        new ZKClientConfig)
+    } catch {
+      case t: Throwable =>
+        Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+        zookeeper.shutdown()
+        if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
+        throw t
+    }
+
+    val topicClient1 = new CapturingZkTopicMigrationClient(zkClient)
+    val topicClient2 = new CapturingZkTopicMigrationClient(zkClient)
+
+    def buildZkMigrationClient(topicClient: TopicMigrationClient): 
ZkMigrationClient = {
+      val configClient = new ZkConfigMigrationClient(zkClient, 
PasswordEncoder.NOOP)
+      val aclClient = new ZkAclMigrationClient(zkClient)
+      val delegationTokenClient = new 
ZkDelegationTokenMigrationClient(zkClient)
+      new ZkMigrationClient(zkClient, topicClient, configClient, aclClient, 
delegationTokenClient)
+    }
+
+    val zkMigrationClient1 = buildZkMigrationClient(topicClient1)
+    val zkMigrationClient2 = buildZkMigrationClient(topicClient2)
+
+    val (driver1, faultHandler1) = buildMigrationDriver(3000, 
zkMigrationClient1)
+    val (driver2, faultHandler2) = buildMigrationDriver(3001, 
zkMigrationClient2)
+
+    // Initialize data into /controller and /controller_epoch
+    zkClient.registerControllerAndIncrementControllerEpoch(0)
+    var zkState = zkMigrationClient1.claimControllerLeadership(
+      ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1)
+    )
+
+    // Fake a complete migration
+    zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10)
+    zkState = zkMigrationClient1.getOrCreateMigrationRecoveryState(zkState)
+
+    try {
+      driver1.start()
+      driver2.start()
+
+      val leader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2)
+      var image = MetadataImage.EMPTY
+      val delta = new MetadataDelta(image)
+      delta.replay(new FeatureLevelRecord()
+        .setName(MetadataVersion.FEATURE_NAME)
+        .setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel))

Review Comment:
   Should we test against `MetadataVersion.latestProduction()` instead?



##########
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##########
@@ -230,16 +234,24 @@ private boolean areZkBrokersReadyForMigration() {
      * @param name         A descriptive name of the function that is being 
applied
      * @param migrationOp  A function which performs some migration operations 
and possibly transforms our internal state
      */
+
     private void applyMigrationOperation(String name, KRaftMigrationOperation 
migrationOp) {
+        applyMigrationOperation(name, migrationOp, false);
+    }
+
+    private void applyMigrationOperation(String name, KRaftMigrationOperation 
migrationOp, boolean alwaysLog) {
         ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
+        long startTimeNs = time.nanoseconds();
         ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState);
-        if (afterState.loggableChangeSinceState(beforeState)) {
-            log.info("{}. Transitioned migration state from {} to {}", name, 
beforeState, afterState);
+        long durationNs = time.nanoseconds() - startTimeNs;
+        if (afterState.loggableChangeSinceState(beforeState) || alwaysLog) {
+            log.info("{} in {} ns. Transitioned migration state from {} to {}",
+                name, durationNs, beforeState, afterState);

Review Comment:
   If `alwaysLog == true`, do we still want to log this if 
`afterState.equals(beforeState)`?



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##########
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
       if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
     }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+    val zookeeper = new EmbeddedZookeeper()
+    var zkClient: KafkaZkClient = null
+    val zkConnect = s"127.0.0.1:${zookeeper.port}"
+    try {
+      zkClient = KafkaZkClient(
+        zkConnect,
+        isSecure = false,
+        30000,
+        60000,
+        1,
+        Time.SYSTEM,
+        name = "ZkMigrationFailoverTest",
+        new ZKClientConfig)
+    } catch {
+      case t: Throwable =>
+        Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+        zookeeper.shutdown()
+        if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
+        throw t
+    }
+
+    val topicClient1 = new CapturingZkTopicMigrationClient(zkClient)
+    val topicClient2 = new CapturingZkTopicMigrationClient(zkClient)
+
+    def buildZkMigrationClient(topicClient: TopicMigrationClient): 
ZkMigrationClient = {
+      val configClient = new ZkConfigMigrationClient(zkClient, 
PasswordEncoder.NOOP)
+      val aclClient = new ZkAclMigrationClient(zkClient)
+      val delegationTokenClient = new 
ZkDelegationTokenMigrationClient(zkClient)
+      new ZkMigrationClient(zkClient, topicClient, configClient, aclClient, 
delegationTokenClient)
+    }
+
+    val zkMigrationClient1 = buildZkMigrationClient(topicClient1)
+    val zkMigrationClient2 = buildZkMigrationClient(topicClient2)
+
+    val (driver1, faultHandler1) = buildMigrationDriver(3000, 
zkMigrationClient1)
+    val (driver2, faultHandler2) = buildMigrationDriver(3001, 
zkMigrationClient2)
+
+    // Initialize data into /controller and /controller_epoch
+    zkClient.registerControllerAndIncrementControllerEpoch(0)
+    var zkState = zkMigrationClient1.claimControllerLeadership(
+      ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1)
+    )
+
+    // Fake a complete migration
+    zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10)
+    zkState = zkMigrationClient1.getOrCreateMigrationRecoveryState(zkState)
+
+    try {
+      driver1.start()
+      driver2.start()
+
+      val leader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2)
+      var image = MetadataImage.EMPTY
+      val delta = new MetadataDelta(image)
+      delta.replay(new FeatureLevelRecord()
+        .setName(MetadataVersion.FEATURE_NAME)
+        .setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel))
+      delta.replay(ZkMigrationState.MIGRATION.toRecord.message)
+
+      val provenance = new MetadataProvenance(210, 11, 1)
+      image = delta.apply(provenance)
+
+      val manifest = LogDeltaManifest.newBuilder()
+        .provenance(provenance)
+        .leaderAndEpoch(leader1)
+        .numBatches(1)
+        .elapsedNs(100)
+        .numBytes(42)
+        .build()
+
+      // Load an image into 3000 image and a leader event, this lets it become 
active and sync to ZK
+      driver1.onMetadataUpdate(delta, image, manifest)
+      driver1.onControllerChange(leader1)
+      driver2.onControllerChange(leader1)
+
+      // Wait until new leader has sync'd to ZK
+      TestUtils.waitUntilTrue(
+        () => 
safeGet(driver1.migrationState()).equals(MigrationDriverState.DUAL_WRITE),
+        "waiting for driver to enter DUAL_WRITE"
+      )
+
+      // Enqueue several metadata deltas and then process a leader change.
+      for (i <- 1 to 1000) {
+        val delta = new MetadataDelta(image)
+        delta.replay(new 
TopicRecord().setTopicId(Uuid.randomUuid()).setName(s"topic-$i"))
+        val provenance = new MetadataProvenance(210 + i, 11, 1)
+        image = delta.apply(provenance)
+        val manifest = LogDeltaManifest.newBuilder()
+          .provenance(provenance)
+          .leaderAndEpoch(leader1)
+          .numBatches(1)
+          .elapsedNs(100)
+          .numBytes(42)
+          .build()
+        driver1.onMetadataUpdate(delta, image, manifest)
+        driver2.onMetadataUpdate(delta, image, manifest)
+      }
+      Thread.sleep(50) // Give some events a chance to run
+
+      val leader2 = new LeaderAndEpoch(OptionalInt.of(3001), 3)
+      driver1.onControllerChange(leader2)
+
+      Thread.sleep(50) // Artificial delay for 3001 to see the leader change.
+      driver2.onControllerChange(leader2)
+
+      // Wait for driver 2 to become leader in ZK
+      TestUtils.waitUntilTrue(() => zkClient.getControllerId match {
+        case Some(nodeId) => nodeId == 3001
+        case None => false
+      }, "waiting for 3001 to claim ZK leadership")
+
+      TestUtils.waitUntilTrue(() => {
+        val topics = zkClient.getAllTopicsInCluster(false)
+        topics.size == 1000
+      }, "waiting for topics to be created in ZK.")
+
+      assertTrue(topicClient1.createdTopics.nonEmpty, "Expect first leader to 
write some topics")
+      assertTrue(topicClient1.createdTopics.nonEmpty, "Expect second leader to 
write some topics")
+      assertEquals(1000, topicClient1.createdTopics.size + 
topicClient2.createdTopics.size,
+        "Expect drivers to only write to ZK if they are the leader")
+    } finally {
+      driver1.close()
+      driver2.close()
+      Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+      zookeeper.shutdown()
+      if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")

Review Comment:
   Same comment here about the double call to `.shutdown()`.
   
   Also, there is a gap in between the try-catch and try-finally blocks, where 
a throwable would take execution out of this method without closing these.
   
   Since the only other test in this class – testControllerFailoverZkRace – 
also makes use of `EmbeddedZookeeper` and `KafkaZkClient`, it might be worth 
moving this to a common teardown method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to