soarez commented on code in PR #15918: URL: https://github.com/apache/kafka/pull/15918#discussion_r1606801722
########## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ########## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { + val zookeeper = new EmbeddedZookeeper() + var zkClient: KafkaZkClient = null + val zkConnect = s"127.0.0.1:${zookeeper.port}" + try { + zkClient = KafkaZkClient( + zkConnect, + isSecure = false, + 30000, + 60000, + 1, + Time.SYSTEM, + name = "ZkMigrationFailoverTest", + new ZKClientConfig) + } catch { + case t: Throwable => + Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") + zookeeper.shutdown() Review Comment: I don't think this makes sense? A call to `.close()` in `EmbeddedZookeeper` delegates to `.shutdown()`. In addition to a direct call to `.close()`, `Utils.closeQuietly()` checks for null and logs any exception in a catch-all. If we want the exception to be thrown, `.shutdown()` is enough and `closeQuietly` can be removed, if instead we want to catch and log a possible exception, then `.shutdown()` needs to be removed. ########## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ########## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { + val zookeeper = new EmbeddedZookeeper() + var zkClient: KafkaZkClient = null + val zkConnect = s"127.0.0.1:${zookeeper.port}" + try { + zkClient = KafkaZkClient( + zkConnect, + isSecure = false, + 30000, + 60000, + 1, + Time.SYSTEM, + name = "ZkMigrationFailoverTest", + new ZKClientConfig) + } catch { + case t: Throwable => + Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") + zookeeper.shutdown() + if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") + throw t + } + + val topicClient1 = new CapturingZkTopicMigrationClient(zkClient) + val topicClient2 = new CapturingZkTopicMigrationClient(zkClient) + + def buildZkMigrationClient(topicClient: TopicMigrationClient): ZkMigrationClient = { + val configClient = new ZkConfigMigrationClient(zkClient, PasswordEncoder.NOOP) + val aclClient = new ZkAclMigrationClient(zkClient) + val delegationTokenClient = new ZkDelegationTokenMigrationClient(zkClient) + new ZkMigrationClient(zkClient, topicClient, configClient, aclClient, delegationTokenClient) + } + + val zkMigrationClient1 = buildZkMigrationClient(topicClient1) + val zkMigrationClient2 = buildZkMigrationClient(topicClient2) + + val (driver1, faultHandler1) = buildMigrationDriver(3000, zkMigrationClient1) + val (driver2, faultHandler2) = buildMigrationDriver(3001, zkMigrationClient2) + + // Initialize data into /controller and /controller_epoch + zkClient.registerControllerAndIncrementControllerEpoch(0) + var zkState = zkMigrationClient1.claimControllerLeadership( + ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1) + ) + + // Fake a complete migration + zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10) + zkState = zkMigrationClient1.getOrCreateMigrationRecoveryState(zkState) + + try { + driver1.start() + driver2.start() + + val leader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2) + var image = MetadataImage.EMPTY + val delta = new MetadataDelta(image) + delta.replay(new FeatureLevelRecord() + .setName(MetadataVersion.FEATURE_NAME) + .setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel)) + delta.replay(ZkMigrationState.MIGRATION.toRecord.message) + + val provenance = new MetadataProvenance(210, 11, 1) + image = delta.apply(provenance) + + val manifest = LogDeltaManifest.newBuilder() + .provenance(provenance) + .leaderAndEpoch(leader1) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build() + + // Load an image into 3000 image and a leader event, this lets it become active and sync to ZK + driver1.onMetadataUpdate(delta, image, manifest) + driver1.onControllerChange(leader1) + driver2.onControllerChange(leader1) + + // Wait until new leader has sync'd to ZK + TestUtils.waitUntilTrue( + () => safeGet(driver1.migrationState()).equals(MigrationDriverState.DUAL_WRITE), + "waiting for driver to enter DUAL_WRITE" + ) + + // Enqueue several metadata deltas and then process a leader change. + for (i <- 1 to 1000) { + val delta = new MetadataDelta(image) + delta.replay(new TopicRecord().setTopicId(Uuid.randomUuid()).setName(s"topic-$i")) + val provenance = new MetadataProvenance(210 + i, 11, 1) + image = delta.apply(provenance) + val manifest = LogDeltaManifest.newBuilder() + .provenance(provenance) + .leaderAndEpoch(leader1) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build() + driver1.onMetadataUpdate(delta, image, manifest) + driver2.onMetadataUpdate(delta, image, manifest) + } + Thread.sleep(50) // Give some events a chance to run + + val leader2 = new LeaderAndEpoch(OptionalInt.of(3001), 3) + driver1.onControllerChange(leader2) + + Thread.sleep(50) // Artificial delay for 3001 to see the leader change. + driver2.onControllerChange(leader2) + + // Wait for driver 2 to become leader in ZK + TestUtils.waitUntilTrue(() => zkClient.getControllerId match { + case Some(nodeId) => nodeId == 3001 + case None => false + }, "waiting for 3001 to claim ZK leadership") + + TestUtils.waitUntilTrue(() => { + val topics = zkClient.getAllTopicsInCluster(false) + topics.size == 1000 + }, "waiting for topics to be created in ZK.") + + assertTrue(topicClient1.createdTopics.nonEmpty, "Expect first leader to write some topics") + assertTrue(topicClient1.createdTopics.nonEmpty, "Expect second leader to write some topics") Review Comment: ```suggestion assertTrue(topicClient2.createdTopics.nonEmpty, "Expect second leader to write some topics") ``` ########## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ########## @@ -619,34 +679,45 @@ public void run() throws Exception { } } - class BecomeZkControllerEvent extends MigrationEvent { + class BecomeZkControllerEvent extends MigrationEvent implements MigrationWriteEvent { + private final LeaderAndEpoch leaderAndEpoch; + + BecomeZkControllerEvent(LeaderAndEpoch leaderAndEpoch) { + this.leaderAndEpoch = leaderAndEpoch; + } + @Override public void run() throws Exception { + // The leader epoch check in checkDriverState prevents us from getting stuck retrying this event after a + // new leader has been seen. if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER, this)) { - applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership); + applyMigrationOperation("Claimed ZK controller leadership", zkMigrationClient::claimControllerLeadership, true); if (migrationLeadershipState.zkControllerEpochZkVersion() == ZkMigrationLeadershipState.UNKNOWN_ZK_VERSION) { log.info("Unable to claim leadership, will retry until we learn of a different KRaft leader"); + return; // Stay in BECOME_CONTROLLER state and retry + } + + // KAFKA-16171 and KAFKA-16667: Prior writing to /controller and /controller_epoch ZNodes above, + // the previous controller could have modified the /migration ZNode. Since ZK does grant us linearizability + // between writes and reads on different ZNodes, we need to write something to the /migration ZNode to + // ensure we have the latest /migration zkVersion. + applyMigrationOperation("Updated migration state", state -> { + state = state.withMigrationZkVersion(-1); // Causes an unconditional update on /migration Review Comment: Suggestion, as it's not obvious how this leads to the unconditional update. ```suggestion state = state.withMigrationZkVersion(-1); // Causes an unconditional update on /migration via KafkaZkClient#retryRequestsUntilConnected ``` ########## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ########## @@ -491,6 +536,13 @@ public void run() throws Exception { return; } + if (!curLeaderAndEpoch.equals(leaderAndEpoch)) { Review Comment: Are any of the previous actions/checks in this method worth running if the epoch is stale? Why not make also add `MetadataChangeEvent` to the set of events that should be skipped in this situation? Might need a different name than `MigrationWriteEvent`, but does the generalization make sense? ########## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ########## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { + val zookeeper = new EmbeddedZookeeper() + var zkClient: KafkaZkClient = null + val zkConnect = s"127.0.0.1:${zookeeper.port}" + try { + zkClient = KafkaZkClient( + zkConnect, + isSecure = false, + 30000, + 60000, + 1, + Time.SYSTEM, + name = "ZkMigrationFailoverTest", + new ZKClientConfig) + } catch { + case t: Throwable => + Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") + zookeeper.shutdown() + if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") + throw t + } + + val topicClient1 = new CapturingZkTopicMigrationClient(zkClient) + val topicClient2 = new CapturingZkTopicMigrationClient(zkClient) + + def buildZkMigrationClient(topicClient: TopicMigrationClient): ZkMigrationClient = { + val configClient = new ZkConfigMigrationClient(zkClient, PasswordEncoder.NOOP) + val aclClient = new ZkAclMigrationClient(zkClient) + val delegationTokenClient = new ZkDelegationTokenMigrationClient(zkClient) + new ZkMigrationClient(zkClient, topicClient, configClient, aclClient, delegationTokenClient) + } + + val zkMigrationClient1 = buildZkMigrationClient(topicClient1) + val zkMigrationClient2 = buildZkMigrationClient(topicClient2) + + val (driver1, faultHandler1) = buildMigrationDriver(3000, zkMigrationClient1) + val (driver2, faultHandler2) = buildMigrationDriver(3001, zkMigrationClient2) + + // Initialize data into /controller and /controller_epoch + zkClient.registerControllerAndIncrementControllerEpoch(0) + var zkState = zkMigrationClient1.claimControllerLeadership( + ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1) + ) + + // Fake a complete migration + zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10) + zkState = zkMigrationClient1.getOrCreateMigrationRecoveryState(zkState) + + try { + driver1.start() + driver2.start() + + val leader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2) + var image = MetadataImage.EMPTY + val delta = new MetadataDelta(image) + delta.replay(new FeatureLevelRecord() + .setName(MetadataVersion.FEATURE_NAME) + .setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel)) Review Comment: Should we test against `MetadataVersion.latestProduction()` instead? ########## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ########## @@ -230,16 +234,24 @@ private boolean areZkBrokersReadyForMigration() { * @param name A descriptive name of the function that is being applied * @param migrationOp A function which performs some migration operations and possibly transforms our internal state */ + private void applyMigrationOperation(String name, KRaftMigrationOperation migrationOp) { + applyMigrationOperation(name, migrationOp, false); + } + + private void applyMigrationOperation(String name, KRaftMigrationOperation migrationOp, boolean alwaysLog) { ZkMigrationLeadershipState beforeState = this.migrationLeadershipState; + long startTimeNs = time.nanoseconds(); ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState); - if (afterState.loggableChangeSinceState(beforeState)) { - log.info("{}. Transitioned migration state from {} to {}", name, beforeState, afterState); + long durationNs = time.nanoseconds() - startTimeNs; + if (afterState.loggableChangeSinceState(beforeState) || alwaysLog) { + log.info("{} in {} ns. Transitioned migration state from {} to {}", + name, durationNs, beforeState, afterState); Review Comment: If `alwaysLog == true`, do we still want to log this if `afterState.equals(beforeState)`? ########## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ########## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { + val zookeeper = new EmbeddedZookeeper() + var zkClient: KafkaZkClient = null + val zkConnect = s"127.0.0.1:${zookeeper.port}" + try { + zkClient = KafkaZkClient( + zkConnect, + isSecure = false, + 30000, + 60000, + 1, + Time.SYSTEM, + name = "ZkMigrationFailoverTest", + new ZKClientConfig) + } catch { + case t: Throwable => + Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") + zookeeper.shutdown() + if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") + throw t + } + + val topicClient1 = new CapturingZkTopicMigrationClient(zkClient) + val topicClient2 = new CapturingZkTopicMigrationClient(zkClient) + + def buildZkMigrationClient(topicClient: TopicMigrationClient): ZkMigrationClient = { + val configClient = new ZkConfigMigrationClient(zkClient, PasswordEncoder.NOOP) + val aclClient = new ZkAclMigrationClient(zkClient) + val delegationTokenClient = new ZkDelegationTokenMigrationClient(zkClient) + new ZkMigrationClient(zkClient, topicClient, configClient, aclClient, delegationTokenClient) + } + + val zkMigrationClient1 = buildZkMigrationClient(topicClient1) + val zkMigrationClient2 = buildZkMigrationClient(topicClient2) + + val (driver1, faultHandler1) = buildMigrationDriver(3000, zkMigrationClient1) + val (driver2, faultHandler2) = buildMigrationDriver(3001, zkMigrationClient2) + + // Initialize data into /controller and /controller_epoch + zkClient.registerControllerAndIncrementControllerEpoch(0) + var zkState = zkMigrationClient1.claimControllerLeadership( + ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1) + ) + + // Fake a complete migration + zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10) + zkState = zkMigrationClient1.getOrCreateMigrationRecoveryState(zkState) + + try { + driver1.start() + driver2.start() + + val leader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2) + var image = MetadataImage.EMPTY + val delta = new MetadataDelta(image) + delta.replay(new FeatureLevelRecord() + .setName(MetadataVersion.FEATURE_NAME) + .setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel)) + delta.replay(ZkMigrationState.MIGRATION.toRecord.message) + + val provenance = new MetadataProvenance(210, 11, 1) + image = delta.apply(provenance) + + val manifest = LogDeltaManifest.newBuilder() + .provenance(provenance) + .leaderAndEpoch(leader1) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build() + + // Load an image into 3000 image and a leader event, this lets it become active and sync to ZK + driver1.onMetadataUpdate(delta, image, manifest) + driver1.onControllerChange(leader1) + driver2.onControllerChange(leader1) + + // Wait until new leader has sync'd to ZK + TestUtils.waitUntilTrue( + () => safeGet(driver1.migrationState()).equals(MigrationDriverState.DUAL_WRITE), + "waiting for driver to enter DUAL_WRITE" + ) + + // Enqueue several metadata deltas and then process a leader change. + for (i <- 1 to 1000) { + val delta = new MetadataDelta(image) + delta.replay(new TopicRecord().setTopicId(Uuid.randomUuid()).setName(s"topic-$i")) + val provenance = new MetadataProvenance(210 + i, 11, 1) + image = delta.apply(provenance) + val manifest = LogDeltaManifest.newBuilder() + .provenance(provenance) + .leaderAndEpoch(leader1) + .numBatches(1) + .elapsedNs(100) + .numBytes(42) + .build() + driver1.onMetadataUpdate(delta, image, manifest) + driver2.onMetadataUpdate(delta, image, manifest) + } + Thread.sleep(50) // Give some events a chance to run + + val leader2 = new LeaderAndEpoch(OptionalInt.of(3001), 3) + driver1.onControllerChange(leader2) + + Thread.sleep(50) // Artificial delay for 3001 to see the leader change. + driver2.onControllerChange(leader2) + + // Wait for driver 2 to become leader in ZK + TestUtils.waitUntilTrue(() => zkClient.getControllerId match { + case Some(nodeId) => nodeId == 3001 + case None => false + }, "waiting for 3001 to claim ZK leadership") + + TestUtils.waitUntilTrue(() => { + val topics = zkClient.getAllTopicsInCluster(false) + topics.size == 1000 + }, "waiting for topics to be created in ZK.") + + assertTrue(topicClient1.createdTopics.nonEmpty, "Expect first leader to write some topics") + assertTrue(topicClient1.createdTopics.nonEmpty, "Expect second leader to write some topics") + assertEquals(1000, topicClient1.createdTopics.size + topicClient2.createdTopics.size, + "Expect drivers to only write to ZK if they are the leader") + } finally { + driver1.close() + driver2.close() + Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") + zookeeper.shutdown() + if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") Review Comment: Same comment here about the double call to `.shutdown()`. Also, there is a gap in between the try-catch and try-finally blocks, where a throwable would take execution out of this method without closing these. Since the only other test in this class – testControllerFailoverZkRace – also makes use of `EmbeddedZookeeper` and `KafkaZkClient`, it might be worth moving this to a common teardown method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org