Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
mumrah commented on PR #15918: URL: https://github.com/apache/kafka/pull/15918#issuecomment-2173659574 I ran ZkMigrationIntegrationTest locally several times and didn't get any failures. Same for ZkMigrationFailoverTest. Let's see how the next Jenkins run goes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
mumrah commented on code in PR #15918: URL: https://github.com/apache/kafka/pull/15918#discussion_r1642904317 ## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { +val zookeeper = new EmbeddedZookeeper() +var zkClient: KafkaZkClient = null +val zkConnect = s"127.0.0.1:${zookeeper.port}" +try { + zkClient = KafkaZkClient( +zkConnect, +isSecure = false, +3, +6, +1, +Time.SYSTEM, +name = "ZkMigrationFailoverTest", +new ZKClientConfig) +} catch { + case t: Throwable => +Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") +zookeeper.shutdown() Review Comment: I missed that one. Fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
soarez commented on code in PR #15918: URL: https://github.com/apache/kafka/pull/15918#discussion_r1642377375 ## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { +val zookeeper = new EmbeddedZookeeper() +var zkClient: KafkaZkClient = null +val zkConnect = s"127.0.0.1:${zookeeper.port}" +try { + zkClient = KafkaZkClient( +zkConnect, +isSecure = false, +3, +6, +1, +Time.SYSTEM, +name = "ZkMigrationFailoverTest", +new ZKClientConfig) +} catch { + case t: Throwable => +Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") +zookeeper.shutdown() Review Comment: I see you marked this as resolved, and addressed it for lines ~416 but not for ~309. Was that intentional? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
soarez commented on code in PR #15918: URL: https://github.com/apache/kafka/pull/15918#discussion_r1642377375 ## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { +val zookeeper = new EmbeddedZookeeper() +var zkClient: KafkaZkClient = null +val zkConnect = s"127.0.0.1:${zookeeper.port}" +try { + zkClient = KafkaZkClient( +zkConnect, +isSecure = false, +3, +6, +1, +Time.SYSTEM, +name = "ZkMigrationFailoverTest", +new ZKClientConfig) +} catch { + case t: Throwable => +Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") +zookeeper.shutdown() Review Comment: I see you marked this as resolved, and addressed it for `testDriverSkipsEventsFromOlderEpoch()` but not for `testDriverSkipsEventsFromOlderEpoch()`. Was that intentional? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
mumrah commented on PR #15918: URL: https://github.com/apache/kafka/pull/15918#issuecomment-2168375635 Thanks for the review @soarez! Sorry it took me so long to get back to this. Been busy migrating clusters to KRaft :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
mumrah commented on code in PR #15918: URL: https://github.com/apache/kafka/pull/15918#discussion_r1640060787 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -491,6 +536,13 @@ public void run() throws Exception { return; } +if (!curLeaderAndEpoch.equals(leaderAndEpoch)) { Review Comment: Yea I wondered about this. In general, I'd like to avoid changing too much in MetadataChangeEvent. Notice how MetadataChangeEvent doesn't use checkDriverState. This is because it needs to check both the driver state as well as the metadata log state. For the driver check this event can actually run in multiple states (since it's how we learn about brokers registering). It also has to check the migration state from the log to avoid trying to do dual writes before the migration is complete (the driver will see partial metadata get published for versions not including metadata transactions). Since it's quite different from other events, I don't think it's worth generalizing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
mumrah commented on code in PR #15918: URL: https://github.com/apache/kafka/pull/15918#discussion_r1640060787 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -491,6 +536,13 @@ public void run() throws Exception { return; } +if (!curLeaderAndEpoch.equals(leaderAndEpoch)) { Review Comment: Yea I wondered about this. In general, I'd like to avoid changing too much in MetadataChangeEvent. Notice how MetadataChangeEvent doesn't use checkDriverState. This is because it needs to check both the driver state as well as the metadata log state. For the driver check this event can actually run in multiple states (since it's how we learn about brokers registering). It also has to check the migration state from the log to avoid trying to do dual writes before the migration is complete (the driver will see partial metadata get published for versions not including metadata transactions). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
mumrah commented on code in PR #15918: URL: https://github.com/apache/kafka/pull/15918#discussion_r1640047647 ## metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java: ## @@ -230,16 +234,24 @@ private boolean areZkBrokersReadyForMigration() { * @param name A descriptive name of the function that is being applied * @param migrationOp A function which performs some migration operations and possibly transforms our internal state */ + private void applyMigrationOperation(String name, KRaftMigrationOperation migrationOp) { +applyMigrationOperation(name, migrationOp, false); +} + +private void applyMigrationOperation(String name, KRaftMigrationOperation migrationOp, boolean alwaysLog) { ZkMigrationLeadershipState beforeState = this.migrationLeadershipState; +long startTimeNs = time.nanoseconds(); ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState); -if (afterState.loggableChangeSinceState(beforeState)) { -log.info("{}. Transitioned migration state from {} to {}", name, beforeState, afterState); +long durationNs = time.nanoseconds() - startTimeNs; +if (afterState.loggableChangeSinceState(beforeState) || alwaysLog) { +log.info("{} in {} ns. Transitioned migration state from {} to {}", +name, durationNs, beforeState, afterState); Review Comment: Yea, I think so. For the two use cases of `alwaysLog == true`, the states will always be different. Principle of Least Surprise makes me thing a developer seeing "alwaysLog" boolean will in fact always cause a log to be produced -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
soarez commented on code in PR #15918: URL: https://github.com/apache/kafka/pull/15918#discussion_r1606801722 ## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { +val zookeeper = new EmbeddedZookeeper() +var zkClient: KafkaZkClient = null +val zkConnect = s"127.0.0.1:${zookeeper.port}" +try { + zkClient = KafkaZkClient( +zkConnect, +isSecure = false, +3, +6, +1, +Time.SYSTEM, +name = "ZkMigrationFailoverTest", +new ZKClientConfig) +} catch { + case t: Throwable => +Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") +zookeeper.shutdown() Review Comment: I don't think this makes sense? A call to `.close()` in `EmbeddedZookeeper` delegates to `.shutdown()`. In addition to a direct call to `.close()`, `Utils.closeQuietly()` checks for null and logs any exception in a catch-all. If we want the exception to be thrown, `.shutdown()` is enough and `closeQuietly` can be removed, if instead we want to catch and log a possible exception, then `.shutdown()` needs to be removed. ## core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala: ## @@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging { if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") } } + + @Test + def testDriverSkipsEventsFromOlderEpoch(): Unit = { +val zookeeper = new EmbeddedZookeeper() +var zkClient: KafkaZkClient = null +val zkConnect = s"127.0.0.1:${zookeeper.port}" +try { + zkClient = KafkaZkClient( +zkConnect, +isSecure = false, +3, +6, +1, +Time.SYSTEM, +name = "ZkMigrationFailoverTest", +new ZKClientConfig) +} catch { + case t: Throwable => +Utils.closeQuietly(zookeeper, "EmbeddedZookeeper") +zookeeper.shutdown() +if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient") +throw t +} + +val topicClient1 = new CapturingZkTopicMigrationClient(zkClient) +val topicClient2 = new CapturingZkTopicMigrationClient(zkClient) + +def buildZkMigrationClient(topicClient: TopicMigrationClient): ZkMigrationClient = { + val configClient = new ZkConfigMigrationClient(zkClient, PasswordEncoder.NOOP) + val aclClient = new ZkAclMigrationClient(zkClient) + val delegationTokenClient = new ZkDelegationTokenMigrationClient(zkClient) + new ZkMigrationClient(zkClient, topicClient, configClient, aclClient, delegationTokenClient) +} + +val zkMigrationClient1 = buildZkMigrationClient(topicClient1) +val zkMigrationClient2 = buildZkMigrationClient(topicClient2) + +val (driver1, faultHandler1) = buildMigrationDriver(3000, zkMigrationClient1) +val (driver2, faultHandler2) = buildMigrationDriver(3001, zkMigrationClient2) + +// Initialize data into /controller and /controller_epoch +zkClient.registerControllerAndIncrementControllerEpoch(0) +var zkState = zkMigrationClient1.claimControllerLeadership( + ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1) +) + +// Fake a complete migration +zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10) +zkState = zkMigrationClient1.getOrCreateMigrationRecoveryState(zkState) + +try { + driver1.start() + driver2.start() + + val leader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2) + var image = MetadataImage.EMPTY + val delta = new MetadataDelta(image) + delta.replay(new FeatureLevelRecord() +.setName(MetadataVersion.FEATURE_NAME) +.setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel)) + delta.replay(ZkMigrationState.MIGRATION.toRecord.message) + + val provenance = new MetadataProvenance(210, 11, 1) + image = delta.apply(provenance) + + val manifest = LogDeltaManifest.newBuilder() +.provenance(provenance) +.leaderAndEpoch(leader1) +.numBatches(1) +.elapsedNs(100) +.numBytes(42) +.build() + + // Load an image into 3000 image and a leader event, this lets it become active and sync to ZK + driver1.onMetadataUpdate(delta, image, manifest) + driver1.onControllerChange(leader1) + driver2.onControllerChange(leader1) + + // Wait until new leader has sync'd to ZK + TestUtils.waitUntilTrue( +() => safeGet(driver1.migrationState()).equals(MigrationDriverState.DUAL_WRITE), +"waiting for driver to enter DUAL_WRITE" + ) + + // Enqueue several metadata deltas and then process a leader change. + for (i <- 1 to 1000) { +val delta = new
[PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]
mumrah opened a new pull request, #15918: URL: https://github.com/apache/kafka/pull/15918 When becoming the active KRaftMigrationDriver, there is another race condition similar to KAFKA-16171. This time, the race is due to a stale read from ZK. After writing to `/controller` and `/controller_epoch`, it is possible that a read on `/migration` is not linear with the writes that were just made. In other words, we get a stale read on `/migration`. This leads to an inability to sync metadata to ZK due to incorrect zkVersion on the migration Znode. The non-linearizability of reads is in fact documented behavior for ZK, so we need to handle it. To fix the stale read, this patch adds a write to `/migration` after updating `/controller` and `/controller_epoch`. This allows us to learn the correct zkVersion for the migration ZNode before leaving the BECOME_CONTROLLER state. This patch also adds a check on the current leader epoch when running certain events in KRaftMigrationDriver. Historically, we did not include this check because it is not necessary for correctness. Writes to ZK are gated on the `/controller_epoch` zkVersion, and RPCs sent to brokers are gated on the controller epoch. However, during a time of rapid failover, there is a lot of processing happening on the controller (i.e., full metadata sync to ZK and full UMRs sent to brokers), so it is best to avoid running events we know will fail. There is also a small fix in here to improve the logging of ZK operations. The log message are changed to past tense to reflect the fact that they have already happened by the time the log message is created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org