This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ec151c82783 KAFKA-16563: retry pollEvent in KRaftMigrationDriver for 
retriable errors (#15732)
ec151c82783 is described below

commit ec151c82783f75de3ee755c80ba9ccfbe338c512
Author: Luke Chen <show...@gmail.com>
AuthorDate: Mon Apr 29 18:44:47 2024 +0900

    KAFKA-16563: retry pollEvent in KRaftMigrationDriver for retriable errors 
(#15732)
    
    When running ZK migrating to KRaft process, we encountered an issue that 
the migrating is hanging and the ZkMigrationState cannot move to MIGRATION 
state. And it is because the pollEvent didn't retry with the retriable 
MigrationClientException (ZK client retriable errors) while it should. This PR 
fixes it and add test. And because of this, the poll event will not poll 
anymore, which causes the KRaftMigrationDriver hanging.
    
    Reviewers: Luke Chen <show...@gmail.com>, Igor Soarez<soa...@apple.com>, 
Akhilesh C <akhilesh...@users.noreply.github.com>
---
 .../metadata/migration/KRaftMigrationDriver.java   | 36 +++++----
 .../migration/KRaftMigrationDriverTest.java        | 94 ++++++++++++++++++----
 2 files changed, 100 insertions(+), 30 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
index 4c796f9eade..7cf82b8762c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationDriver.java
@@ -165,19 +165,6 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         return stateFuture;
     }
 
-    private void recoverMigrationStateFromZK() {
-        applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
-        String maybeDone = 
migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
-        log.info("Initial migration of ZK metadata is {}.", maybeDone);
-
-        // Once we've recovered the migration state from ZK, install this 
class as a metadata publisher
-        // by calling the initialZkLoadHandler.
-        initialZkLoadHandler.accept(this);
-
-        // Transition to INACTIVE state and wait for leadership events.
-        transitionTo(MigrationDriverState.INACTIVE);
-    }
-
     private boolean isControllerQuorumReadyForMigration() {
         Optional<String> notReadyMsg = 
this.quorumFeatures.reasonAllControllersZkMigrationNotReady(
                 image.features().metadataVersion(), 
image.cluster().controllers());
@@ -414,7 +401,7 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
     /**
      * An event generated by a call to {@link 
MetadataPublisher#onControllerChange}. This will not be called until
      * this class is registered with {@link 
org.apache.kafka.image.loader.MetadataLoader}. The registration happens
-     * after the migration state is loaded from ZooKeeper in {@link 
#recoverMigrationStateFromZK}.
+     * after the migration state is loaded from ZooKeeper in {@link 
RecoverMigrationStateFromZKEvent}.
      */
     class KRaftLeaderEvent extends MigrationEvent {
         private final LeaderAndEpoch leaderAndEpoch;
@@ -786,12 +773,31 @@ public class KRaftMigrationDriver implements 
MetadataPublisher {
         }
     }
 
+    class RecoverMigrationStateFromZKEvent extends MigrationEvent {
+        @Override
+        public void run() throws Exception {
+            if (checkDriverState(MigrationDriverState.UNINITIALIZED, this)) {
+                applyMigrationOperation("Recovering migration state from ZK", 
zkMigrationClient::getOrCreateMigrationRecoveryState);
+                String maybeDone = 
migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
+                log.info("Initial migration of ZK metadata is {}.", maybeDone);
+
+                // Once we've recovered the migration state from ZK, install 
this class as a metadata publisher
+                // by calling the initialZkLoadHandler.
+                initialZkLoadHandler.accept(KRaftMigrationDriver.this);
+
+                // Transition to INACTIVE state and wait for leadership events.
+                transitionTo(MigrationDriverState.INACTIVE);
+            }
+        }
+    }
+
     class PollEvent extends MigrationEvent {
+
         @Override
         public void run() throws Exception {
             switch (migrationState) {
                 case UNINITIALIZED:
-                    recoverMigrationStateFromZK();
+                    eventQueue.append(new RecoverMigrationStateFromZKEvent());
                     break;
                 case INACTIVE:
                     // Nothing to do when the driver is inactive. We must wait 
until a KRaftLeaderEvent
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
index dea5e62db96..14f347ca1e1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java
@@ -253,7 +253,7 @@ public class KRaftMigrationDriverTest {
             MetadataImage image = MetadataImage.EMPTY;
             MetadataDelta delta = new MetadataDelta(image);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             setupDeltaForMigration(delta, registerControllers);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(1));
@@ -338,7 +338,7 @@ public class KRaftMigrationDriverTest {
             MetadataDelta delta = new MetadataDelta(image);
             setupDeltaForMigration(delta, true);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(1));
             delta.replay(zkBrokerRecord(2));
@@ -363,6 +363,62 @@ public class KRaftMigrationDriverTest {
         }
     }
 
+    @Test
+    public void testMigrationWithClientExceptionWhileMigratingZnodeCreation() 
throws Exception {
+        CountingMetadataPropagator metadataPropagator = new 
CountingMetadataPropagator();
+        // suppose the ZNode creation failed 3 times
+        CountDownLatch createZnodeAttempts = new CountDownLatch(3);
+        CapturingMigrationClient migrationClient = new 
CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)),
+                new CapturingTopicMigrationClient(),
+                new CapturingConfigMigrationClient(),
+                new CapturingAclMigrationClient(),
+                new CapturingDelegationTokenMigrationClient(),
+                CapturingMigrationClient.EMPTY_BATCH_SUPPLIER) {
+            @Override
+            public ZkMigrationLeadershipState 
getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
+                if (createZnodeAttempts.getCount() == 0) {
+                    this.setMigrationRecoveryState(initialState);
+                    return initialState;
+                } else {
+                    createZnodeAttempts.countDown();
+                    throw new MigrationClientException("Some kind of ZK 
error!");
+                }
+            }
+        };
+        MockFaultHandler faultHandler = new 
MockFaultHandler("testMigrationClientExpiration");
+        KRaftMigrationDriver.Builder builder = defaultTestBuilder()
+                .setZkMigrationClient(migrationClient)
+                .setFaultHandler(faultHandler)
+                .setPropagator(metadataPropagator);
+        try (KRaftMigrationDriver driver = builder.build()) {
+            MetadataImage image = MetadataImage.EMPTY;
+            MetadataDelta delta = new MetadataDelta(image);
+            setupDeltaForMigration(delta, true);
+
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
+
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+            delta.replay(zkBrokerRecord(1));
+            delta.replay(zkBrokerRecord(2));
+            delta.replay(zkBrokerRecord(3));
+            MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+            image = delta.apply(provenance);
+            // Before leadership claiming, the 
getOrCreateMigrationRecoveryState should be able to get correct state
+            assertTrue(createZnodeAttempts.await(1, TimeUnit.MINUTES));
+
+            // Notify the driver that it is the leader
+            driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 
1));
+            // Publish metadata of all the ZK brokers being ready
+            driver.onMetadataUpdate(delta, image, 
logDeltaManifestBuilder(provenance,
+                    new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
+
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+                    "Waiting for KRaftMigrationDriver to enter DUAL_WRITE 
state");
+
+            Assertions.assertNull(faultHandler.firstException());
+        }
+    }
+
     private void setupDeltaForMigration(
         MetadataDelta delta,
         boolean registerControllers
@@ -413,7 +469,7 @@ public class KRaftMigrationDriverTest {
             MetadataImage image = MetadataImage.EMPTY;
             MetadataDelta delta = new MetadataDelta(image);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             if (allNodePresent) {
                 setupDeltaWithControllerRegistrations(delta, Arrays.asList(4, 
5, 6), Arrays.asList());
             } else {
@@ -469,7 +525,7 @@ public class KRaftMigrationDriverTest {
             migrationClient.setMigrationRecoveryState(
                 
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(1));
             delta.replay(zkBrokerRecord(2));
@@ -483,7 +539,7 @@ public class KRaftMigrationDriverTest {
                 new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
 
             TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
-                "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+                "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
         }
     }
 
@@ -546,7 +602,7 @@ public class KRaftMigrationDriverTest {
                 DelegationTokenImage.EMPTY);
             MetadataDelta delta = new MetadataDelta(image);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             setupDeltaForMigration(delta, true);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(0));
@@ -565,7 +621,7 @@ public class KRaftMigrationDriverTest {
 
             // Wait for migration
             TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
-                "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+                "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
 
             // Modify topics in a KRaft snapshot -- delete foo, modify bar, 
add baz, add new foo, add bam, delete bam
             provenance = new MetadataProvenance(200, 1, 1);
@@ -601,7 +657,7 @@ public class KRaftMigrationDriverTest {
                 DelegationTokenImage.EMPTY);
             MetadataDelta delta = new MetadataDelta(image);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             setupDeltaForMigration(delta, true);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(0));
@@ -656,7 +712,7 @@ public class KRaftMigrationDriverTest {
                 DelegationTokenImage.EMPTY);
             MetadataDelta delta = new MetadataDelta(image);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             setupDeltaForMigration(delta, true);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(0));
@@ -673,7 +729,7 @@ public class KRaftMigrationDriverTest {
             driver.onControllerChange(newLeader);
 
             TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
-                "Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
+                "Waiting for KRaftMigrationDriver to enter 
WAIT_FOR_CONTROLLER_QUORUM state");
 
             driver.onMetadataUpdate(delta, image, 
logDeltaManifestBuilder(provenance, newLeader).build());
 
@@ -711,7 +767,7 @@ public class KRaftMigrationDriverTest {
                 DelegationTokenImage.EMPTY);
             MetadataDelta delta = new MetadataDelta(image);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             setupDeltaForMigration(delta, true);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(0));
@@ -778,7 +834,7 @@ public class KRaftMigrationDriverTest {
             MetadataImage image = MetadataImage.EMPTY;
             MetadataDelta delta = new MetadataDelta(image);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             setupDeltaForMigration(delta, true);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(1));
@@ -798,7 +854,7 @@ public class KRaftMigrationDriverTest {
                 new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
 
             TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
-                    "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+                    "Waiting for KRaftMigrationDriver to enter DUAL_WRITE 
state");
             assertEquals(1, migrationBeginCalls.get());
         }
     }
@@ -864,7 +920,7 @@ public class KRaftMigrationDriverTest {
             MetadataImage image = MetadataImage.EMPTY;
             MetadataDelta delta = new MetadataDelta(image);
 
-            driver.start();
+            startAndWaitForRecoveringMigrationStateFromZK(driver);
             setupDeltaForMigration(delta, true);
             delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
             delta.replay(zkBrokerRecord(1));
@@ -881,10 +937,18 @@ public class KRaftMigrationDriverTest {
                     new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());
 
             TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
-                    "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+                    "Waiting for KRaftMigrationDriver to enter DUAL_WRITE 
state");
 
             assertEquals(expectedBatchCount, batchesPassedToController.size());
             assertEquals(expectedRecordCount, 
batchesPassedToController.stream().mapToInt(List::size).sum());
         }
     }
+
+    // Wait until the driver has recovered MigrationState From ZK. This is to 
simulate the driver needs to be installed as the metadata publisher
+    // so that it can receive onControllerChange (KRaftLeaderEvent) and 
onMetadataUpdate (MetadataChangeEvent) events.
+    private void 
startAndWaitForRecoveringMigrationStateFromZK(KRaftMigrationDriver driver) 
throws InterruptedException {
+        driver.start();
+        TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.INACTIVE),
+                "Waiting for KRaftMigrationDriver to enter INACTIVE state");
+    }
 }

Reply via email to