showuon commented on code in PR #15022:
URL: https://github.com/apache/kafka/pull/15022#discussion_r1465893905


##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -757,16 +759,64 @@ class ZkMigrationIntegrationTest {
     }
   }
 
-  def verifyTopicPartitionMetadata(topicName: String, partitions: 
Seq[TopicPartition], zkClient: KafkaZkClient): Unit = {
+  def createTopic(topicName: String, numPartitions: Int, replicationFactor: 
Short, configs: util.Map[String, String], admin: Admin): Unit = {
+    val newTopic = new NewTopic(topicName, numPartitions, 
replicationFactor).configs(configs)
+    val createTopicResult = 
admin.createTopics(util.Collections.singletonList(newTopic))
+    createTopicResult.all.get(60, TimeUnit.SECONDS)
+
+    TestUtils.waitUntilTrue(() => {
+      admin.listTopics.names.get.contains(topicName)
+    }, s"Unable to find topic $topicName")
+  }
+
+  def verifyTopic(topicName: String, numPartitions: Int, replicationFactor: 
Short, configs: util.Map[String, String], admin: Admin, zkClient: 
KafkaZkClient): Unit = {
+    // Verify the changes are in ZK
+    verifyZKTopicPartitionMetadata(topicName, numPartitions, 
replicationFactor, zkClient)
+    verifyZKTopicConfigs(topicName, configs, zkClient)
+    // Verify the changes are in KRaft
+    verifyKRaftTopicPartitionMetadata(topicName, numPartitions, 
replicationFactor, admin)
+    verifyKRaftTopicConfigs(topicName, configs, admin)
+  }

Review Comment:
   Nice verification for ZK and KRaft.



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -649,29 +650,30 @@ class ZkMigrationIntegrationTest {
         30000)
 
       // Alter the metadata
-      log.info("Create new topic with AdminClient")
       admin = zkCluster.createAdminClient()
-      val newTopics = new util.ArrayList[NewTopic]()
-      newTopics.add(new NewTopic(topicName, 2, 3.toShort))
-      val createTopicResult = admin.createTopics(newTopics)
-      createTopicResult.all().get(60, TimeUnit.SECONDS)
+      log.info(s"Create new topic $topic1 with AdminClient with some configs")
+      val topicConfigs = util.Collections.singletonMap("cleanup.policy", 
"compact")
+      createTopic(topic1, 2, 3.toShort, topicConfigs, admin)
+      verifyTopic(topic1, 2, 3.toShort, topicConfigs, admin, zkClient)

Review Comment:
   nit: We can create 2 variables for the 2, and 3. Ex: 
`originalPartitionCount`, `replicationFactor`



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -610,8 +611,8 @@ class ZkMigrationIntegrationTest {
     new ClusterConfigProperty(key = "listener.security.protocol.map", value = 
"EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
   ))
   def testNewAndChangedTopicsInDualWrite(zkCluster: ClusterInstance): Unit = {
-    // Create a topic in ZK mode
-    val topicName = "test"
+    val topic1 = "test1"
+    val topic2 = "test2"
     var admin = zkCluster.createAdminClient()

Review Comment:
   I can see there are some adminClient is not closed in this test suite. It 
might be good if we can close them in this PR or in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to