ajit97singh commented on code in PR #17004:
URL: https://github.com/apache/kafka/pull/17004#discussion_r1731520505


##########
core/src/test/resources/log4j.properties:
##########
@@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=WARN
-log4j.logger.org.apache.kafka=WARN
+log4j.logger.kafka=INFO
+log4j.logger.org.apache.kafka=INFO
 

Review Comment:
   Any other reason for doing this apart from local testing ?



##########
core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala:
##########
@@ -296,13 +296,27 @@ class ZkMigrationIntegrationTest {
   def testMigrateTopicDeletions(zkCluster: ClusterInstance): Unit = {
     // Create some topics in ZK mode
     var admin = zkCluster.createAdminClient()
-    val newTopics = new util.ArrayList[NewTopic]()
-    newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
-    newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
-    newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
-    val createTopicResult = admin.createTopics(newTopics)
-    createTopicResult.all().get(300, TimeUnit.SECONDS)
-    admin.close()
+    try {
+      val newTopics = new util.ArrayList[NewTopic]()
+      newTopics.add(new NewTopic("test-topic-1", 10, 3.toShort))
+      newTopics.add(new NewTopic("test-topic-2", 10, 3.toShort))
+      newTopics.add(new NewTopic("test-topic-3", 10, 3.toShort))
+      val createTopicResult = admin.createTopics(newTopics)
+      createTopicResult.all().get(300, TimeUnit.SECONDS)
+      TestUtils.waitUntilTrue(() => {
+        val topicDescribe = admin.describeTopics(Seq("test-topic-1", 
"test-topic-2", "test-topic-3").asJava)
+        if (topicDescribe.topicNameValues() == null || 
topicDescribe.topicNameValues().size() < 3) {
+          false
+        } else {
+          topicDescribe.topicNameValues().values().stream().allMatch {
+            topic => topic.get(30, 
TimeUnit.SECONDS).partitions().stream().allMatch(part => part.leader() != null)
+          }
+        }
+      }, msg="waiting for topics to be available", waitTimeMs=300)
+    } finally {

Review Comment:
   Can we add a threshold until which TestUtils.waitUntilTrue() will go on 
until throws an exception and we can catch it , log it out and close in finally 
block 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to