Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-06-17 Thread via GitHub


mumrah commented on PR #15918:
URL: https://github.com/apache/kafka/pull/15918#issuecomment-2173659574

   I ran ZkMigrationIntegrationTest locally several times and didn't get any 
failures. Same for ZkMigrationFailoverTest. Let's see how the next Jenkins run 
goes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-06-17 Thread via GitHub


mumrah commented on code in PR #15918:
URL: https://github.com/apache/kafka/pull/15918#discussion_r1642904317


##
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
   if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
 }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+val zookeeper = new EmbeddedZookeeper()
+var zkClient: KafkaZkClient = null
+val zkConnect = s"127.0.0.1:${zookeeper.port}"
+try {
+  zkClient = KafkaZkClient(
+zkConnect,
+isSecure = false,
+3,
+6,
+1,
+Time.SYSTEM,
+name = "ZkMigrationFailoverTest",
+new ZKClientConfig)
+} catch {
+  case t: Throwable =>
+Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+zookeeper.shutdown()

Review Comment:
   I missed that one. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-06-17 Thread via GitHub


soarez commented on code in PR #15918:
URL: https://github.com/apache/kafka/pull/15918#discussion_r1642377375


##
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
   if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
 }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+val zookeeper = new EmbeddedZookeeper()
+var zkClient: KafkaZkClient = null
+val zkConnect = s"127.0.0.1:${zookeeper.port}"
+try {
+  zkClient = KafkaZkClient(
+zkConnect,
+isSecure = false,
+3,
+6,
+1,
+Time.SYSTEM,
+name = "ZkMigrationFailoverTest",
+new ZKClientConfig)
+} catch {
+  case t: Throwable =>
+Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+zookeeper.shutdown()

Review Comment:
   I see you marked this as resolved, and addressed it for lines ~416 but not 
for ~309. Was that intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-06-17 Thread via GitHub


soarez commented on code in PR #15918:
URL: https://github.com/apache/kafka/pull/15918#discussion_r1642377375


##
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
   if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
 }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+val zookeeper = new EmbeddedZookeeper()
+var zkClient: KafkaZkClient = null
+val zkConnect = s"127.0.0.1:${zookeeper.port}"
+try {
+  zkClient = KafkaZkClient(
+zkConnect,
+isSecure = false,
+3,
+6,
+1,
+Time.SYSTEM,
+name = "ZkMigrationFailoverTest",
+new ZKClientConfig)
+} catch {
+  case t: Throwable =>
+Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+zookeeper.shutdown()

Review Comment:
   I see you marked this as resolved, and addressed it for 
`testDriverSkipsEventsFromOlderEpoch()` but not for 
`testDriverSkipsEventsFromOlderEpoch()`. Was that intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-06-14 Thread via GitHub


mumrah commented on PR #15918:
URL: https://github.com/apache/kafka/pull/15918#issuecomment-2168375635

   Thanks for the review @soarez! Sorry it took me so long to get back to this. 
Been busy migrating clusters to KRaft :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-06-14 Thread via GitHub


mumrah commented on code in PR #15918:
URL: https://github.com/apache/kafka/pull/15918#discussion_r1640060787


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -491,6 +536,13 @@ public void run() throws Exception {
 return;
 }
 
+if (!curLeaderAndEpoch.equals(leaderAndEpoch)) {

Review Comment:
   Yea I wondered about this. In general, I'd like to avoid changing too much 
in MetadataChangeEvent.
   
   Notice how MetadataChangeEvent doesn't use checkDriverState. This is because 
it needs to check both the driver state as well as the metadata log state. For 
the driver check this event can actually run in multiple states (since it's how 
we learn about brokers registering). It also has to check the migration state 
from the log to avoid trying to do dual writes before the migration is complete 
(the driver will see partial metadata get published for versions not including 
metadata transactions).
   
   Since it's quite different from other events, I don't think it's worth 
generalizing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-06-14 Thread via GitHub


mumrah commented on code in PR #15918:
URL: https://github.com/apache/kafka/pull/15918#discussion_r1640060787


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -491,6 +536,13 @@ public void run() throws Exception {
 return;
 }
 
+if (!curLeaderAndEpoch.equals(leaderAndEpoch)) {

Review Comment:
   Yea I wondered about this. In general, I'd like to avoid changing too much 
in MetadataChangeEvent.
   
   Notice how MetadataChangeEvent doesn't use checkDriverState. This is because 
it needs to check both the driver state as well as the metadata log state. For 
the driver check this event can actually run in multiple states (since it's how 
we learn about brokers registering). It also has to check the migration state 
from the log to avoid trying to do dual writes before the migration is complete 
(the driver will see partial metadata get published for versions not including 
metadata transactions).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-06-14 Thread via GitHub


mumrah commented on code in PR #15918:
URL: https://github.com/apache/kafka/pull/15918#discussion_r1640047647


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java:
##
@@ -230,16 +234,24 @@ private boolean areZkBrokersReadyForMigration() {
  * @param name A descriptive name of the function that is being 
applied
  * @param migrationOp  A function which performs some migration operations 
and possibly transforms our internal state
  */
+
 private void applyMigrationOperation(String name, KRaftMigrationOperation 
migrationOp) {
+applyMigrationOperation(name, migrationOp, false);
+}
+
+private void applyMigrationOperation(String name, KRaftMigrationOperation 
migrationOp, boolean alwaysLog) {
 ZkMigrationLeadershipState beforeState = this.migrationLeadershipState;
+long startTimeNs = time.nanoseconds();
 ZkMigrationLeadershipState afterState = migrationOp.apply(beforeState);
-if (afterState.loggableChangeSinceState(beforeState)) {
-log.info("{}. Transitioned migration state from {} to {}", name, 
beforeState, afterState);
+long durationNs = time.nanoseconds() - startTimeNs;
+if (afterState.loggableChangeSinceState(beforeState) || alwaysLog) {
+log.info("{} in {} ns. Transitioned migration state from {} to {}",
+name, durationNs, beforeState, afterState);

Review Comment:
   Yea, I think so. For the two use cases of `alwaysLog == true`, the states 
will always be different. Principle of Least Surprise makes me thing a 
developer seeing "alwaysLog" boolean will in fact always cause a log to be 
produced



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-05-20 Thread via GitHub


soarez commented on code in PR #15918:
URL: https://github.com/apache/kafka/pull/15918#discussion_r1606801722


##
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
   if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
 }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+val zookeeper = new EmbeddedZookeeper()
+var zkClient: KafkaZkClient = null
+val zkConnect = s"127.0.0.1:${zookeeper.port}"
+try {
+  zkClient = KafkaZkClient(
+zkConnect,
+isSecure = false,
+3,
+6,
+1,
+Time.SYSTEM,
+name = "ZkMigrationFailoverTest",
+new ZKClientConfig)
+} catch {
+  case t: Throwable =>
+Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+zookeeper.shutdown()

Review Comment:
   I don't think this makes sense?
   
   A call to `.close()` in `EmbeddedZookeeper` delegates to `.shutdown()`.  In 
addition to a direct call to `.close()`, `Utils.closeQuietly()` checks for null 
and logs any exception in a catch-all. 
   If we want the exception to be thrown, `.shutdown()` is enough and 
`closeQuietly` can be removed, if instead we want to catch and log a possible 
exception, then `.shutdown()` needs to be removed.



##
core/src/test/scala/integration/kafka/zk/ZkMigrationFailoverTest.scala:
##
@@ -274,4 +288,135 @@ class ZkMigrationFailoverTest extends Logging {
   if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
 }
   }
+
+  @Test
+  def testDriverSkipsEventsFromOlderEpoch(): Unit = {
+val zookeeper = new EmbeddedZookeeper()
+var zkClient: KafkaZkClient = null
+val zkConnect = s"127.0.0.1:${zookeeper.port}"
+try {
+  zkClient = KafkaZkClient(
+zkConnect,
+isSecure = false,
+3,
+6,
+1,
+Time.SYSTEM,
+name = "ZkMigrationFailoverTest",
+new ZKClientConfig)
+} catch {
+  case t: Throwable =>
+Utils.closeQuietly(zookeeper, "EmbeddedZookeeper")
+zookeeper.shutdown()
+if (zkClient != null) Utils.closeQuietly(zkClient, "KafkaZkClient")
+throw t
+}
+
+val topicClient1 = new CapturingZkTopicMigrationClient(zkClient)
+val topicClient2 = new CapturingZkTopicMigrationClient(zkClient)
+
+def buildZkMigrationClient(topicClient: TopicMigrationClient): 
ZkMigrationClient = {
+  val configClient = new ZkConfigMigrationClient(zkClient, 
PasswordEncoder.NOOP)
+  val aclClient = new ZkAclMigrationClient(zkClient)
+  val delegationTokenClient = new 
ZkDelegationTokenMigrationClient(zkClient)
+  new ZkMigrationClient(zkClient, topicClient, configClient, aclClient, 
delegationTokenClient)
+}
+
+val zkMigrationClient1 = buildZkMigrationClient(topicClient1)
+val zkMigrationClient2 = buildZkMigrationClient(topicClient2)
+
+val (driver1, faultHandler1) = buildMigrationDriver(3000, 
zkMigrationClient1)
+val (driver2, faultHandler2) = buildMigrationDriver(3001, 
zkMigrationClient2)
+
+// Initialize data into /controller and /controller_epoch
+zkClient.registerControllerAndIncrementControllerEpoch(0)
+var zkState = zkMigrationClient1.claimControllerLeadership(
+  ZkMigrationLeadershipState.EMPTY.withNewKRaftController(3000, 1)
+)
+
+// Fake a complete migration
+zkState = zkState.withKRaftMetadataOffsetAndEpoch(100, 10)
+zkState = zkMigrationClient1.getOrCreateMigrationRecoveryState(zkState)
+
+try {
+  driver1.start()
+  driver2.start()
+
+  val leader1 = new LeaderAndEpoch(OptionalInt.of(3000), 2)
+  var image = MetadataImage.EMPTY
+  val delta = new MetadataDelta(image)
+  delta.replay(new FeatureLevelRecord()
+.setName(MetadataVersion.FEATURE_NAME)
+.setFeatureLevel(MetadataVersion.IBP_3_6_IV1.featureLevel))
+  delta.replay(ZkMigrationState.MIGRATION.toRecord.message)
+
+  val provenance = new MetadataProvenance(210, 11, 1)
+  image = delta.apply(provenance)
+
+  val manifest = LogDeltaManifest.newBuilder()
+.provenance(provenance)
+.leaderAndEpoch(leader1)
+.numBatches(1)
+.elapsedNs(100)
+.numBytes(42)
+.build()
+
+  // Load an image into 3000 image and a leader event, this lets it become 
active and sync to ZK
+  driver1.onMetadataUpdate(delta, image, manifest)
+  driver1.onControllerChange(leader1)
+  driver2.onControllerChange(leader1)
+
+  // Wait until new leader has sync'd to ZK
+  TestUtils.waitUntilTrue(
+() => 
safeGet(driver1.migrationState()).equals(MigrationDriverState.DUAL_WRITE),
+"waiting for driver to enter DUAL_WRITE"
+  )
+
+  // Enqueue several metadata deltas and then process a leader change.
+  for (i <- 1 to 1000) {
+val delta = new 

[PR] KAFKA-16667 Avoid stale read in KRaftMigrationDriver [kafka]

2024-05-10 Thread via GitHub


mumrah opened a new pull request, #15918:
URL: https://github.com/apache/kafka/pull/15918

   When becoming the active KRaftMigrationDriver, there is another race 
condition similar to KAFKA-16171. This time, the race is due to a stale read 
from ZK. After writing to `/controller` and `/controller_epoch`, it is possible 
that a read on `/migration` is not linear with the writes that were just made. 
In other words, we get a stale read on `/migration`. This leads to an inability 
to sync metadata to ZK due to incorrect zkVersion on the migration Znode. 
   
   The non-linearizability of reads is in fact documented behavior for ZK, so 
we need to handle it.
   
   To fix the stale read, this patch adds a write to `/migration` after 
updating `/controller` and `/controller_epoch`. This allows us to learn the 
correct zkVersion for the migration ZNode before leaving the BECOME_CONTROLLER 
state.
   
   This patch also adds a check on the current leader epoch when running 
certain events in KRaftMigrationDriver. Historically, we did not include this 
check because it is not necessary for correctness. Writes to ZK are gated on 
the  `/controller_epoch` zkVersion, and RPCs sent to brokers are gated on the 
controller epoch. However, during a time of rapid failover, there is a lot of 
processing happening on the controller (i.e., full metadata sync to ZK and full 
UMRs sent to brokers), so it is best to avoid running events we know will fail.
   
   There is also a small fix in here to improve the logging of ZK operations. 
The log message are changed to past tense to reflect the fact that they have 
already happened by the time the log message is created.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org