showuon commented on code in PR #13461:
URL: https://github.com/apache/kafka/pull/13461#discussion_r1182450255


##########
metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java:
##########
@@ -534,5 +421,157 @@ public void testSkipWaitForBrokersInDualWrite() throws 
Exception {
                 "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
         }
     }
-}
 
+    @FunctionalInterface
+    interface TopicDualWriteVerifier {
+        void verify(
+            KRaftMigrationDriver driver,
+            CapturingTopicMigrationClient topicClient,
+            CapturingConfigMigrationClient configClient
+        ) throws Exception;
+    }
+
+    public void setupTopicDualWrite(TopicDualWriteVerifier verifier) throws 
Exception {
+        CountingMetadataPropagator metadataPropagator = new 
CountingMetadataPropagator();
+
+        CapturingTopicMigrationClient topicClient = new 
CapturingTopicMigrationClient() {
+            @Override
+            public void iterateTopics(EnumSet<TopicVisitorInterest> interests, 
TopicVisitor visitor) {
+                IMAGE1.topicsByName().forEach((topicName, topicImage) -> {
+                    Map<Integer, List<Integer>> assignment = new HashMap<>();
+                    topicImage.partitions().forEach((partitionId, 
partitionRegistration) ->
+                        assignment.put(partitionId, 
IntStream.of(partitionRegistration.replicas).boxed().collect(Collectors.toList()))
+                    );
+                    visitor.visitTopic(topicName, topicImage.id(), assignment);
+
+                    topicImage.partitions().forEach((partitionId, 
partitionRegistration) ->
+                        visitor.visitPartition(new 
TopicIdPartition(topicImage.id(), new TopicPartition(topicName, partitionId)), 
partitionRegistration)
+                    );
+                });
+            }
+        };
+        CapturingConfigMigrationClient configClient = new 
CapturingConfigMigrationClient();
+        CapturingMigrationClient migrationClient = 
CapturingMigrationClient.newBuilder()
+            .setBrokersInZk(0, 1, 2, 3, 4, 5)
+            .setTopicMigrationClient(topicClient)
+            .setConfigMigrationClient(configClient)
+            .build();
+
+        try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
+            3000,
+            new NoOpRecordConsumer(),
+            migrationClient,
+            metadataPropagator,
+            metadataPublisher -> { },
+            new MockFaultHandler("test"),
+            quorumFeatures,
+            mockTime
+        )) {
+            verifier.verify(driver, topicClient, configClient);
+        }
+    }
+
+    @Test
+    public void testTopicDualWriteSnapshot() throws Exception {
+        setupTopicDualWrite((driver, topicClient, configClient) -> {
+            MetadataImage image = new MetadataImage(
+                MetadataProvenance.EMPTY,
+                FeaturesImage.EMPTY,
+                ClusterImage.EMPTY,
+                IMAGE1,
+                ConfigurationsImage.EMPTY,
+                ClientQuotasImage.EMPTY,
+                ProducerIdsImage.EMPTY,
+                AclsImage.EMPTY,
+                ScramImage.EMPTY);
+            MetadataDelta delta = new MetadataDelta(image);
+
+            driver.start();
+            delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
+            delta.replay(zkBrokerRecord(0));
+            delta.replay(zkBrokerRecord(1));
+            delta.replay(zkBrokerRecord(2));
+            delta.replay(zkBrokerRecord(3));
+            delta.replay(zkBrokerRecord(4));
+            delta.replay(zkBrokerRecord(5));
+            MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
+            image = delta.apply(provenance);
+
+            // Publish a delta with this node (3000) as the leader
+            LeaderAndEpoch newLeader = new 
LeaderAndEpoch(OptionalInt.of(3000), 1);
+            driver.onControllerChange(newLeader);
+            driver.onMetadataUpdate(delta, image, new 
LogDeltaManifest(provenance, newLeader, 1, 100, 42));
+
+            // Wait for migration
+            TestUtils.waitForCondition(() -> driver.migrationState().get(1, 
TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
+                "Waiting for KRaftMigrationDriver to enter ZK_MIGRATION 
state");
+
+            // Modify topics in a KRaft snapshot -- delete foo, modify bar, 
add baz
+            provenance = new MetadataProvenance(200, 1, 1);
+            delta = new MetadataDelta(image);
+            RecordTestUtils.replayAll(delta, DELTA1_RECORDS);
+            image = delta.apply(provenance);
+            driver.onMetadataUpdate(delta, image, new 
SnapshotManifest(provenance, 100));
+            driver.migrationState().get(1, TimeUnit.MINUTES);

Review Comment:
   What's the purpose we get migrationState here? I guess that's because we 
want to make sure all the events are handled successfully, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to