chia7712 commented on code in PR #20108:
URL: https://github.com/apache/kafka/pull/20108#discussion_r2191180839


##########
test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java:
##########
@@ -258,7 +257,63 @@ default Set<GroupProtocol> supportedGroupProtocols() {
     //---------------------------[wait]---------------------------//
 
     default void waitTopicDeletion(String topic) throws InterruptedException {
-        waitForTopic(topic, 0);
+        Collection<KafkaBroker> brokers = aliveBrokers().values();
+        // wait for metadata
+        TestUtils.waitForCondition(
+            () -> brokers.stream().allMatch(
+                broker -> 
broker.metadataCache().numPartitions(topic).isEmpty()),
+                60000L, topic + " metadata not propagated after 60000 ms");
+
+        for (ControllerServer controller : controllers().values()) {
+            long controllerOffset = 
controller.raftManager().replicatedLog().endOffset().offset() - 1;
+            TestUtils.waitForCondition(
+                () -> brokers.stream().allMatch(broker -> ((BrokerServer) 
broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset),
+                60000L, "Timeout waiting for controller metadata propagating 
to brokers");
+        }
+
+        List<TopicPartition> topicPartitions = List.of(new 
TopicPartition(topic, 0));
+
+        // Ensure that the topic-partition has been deleted from all brokers' 
replica managers
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                        topicPartitions.stream().allMatch(tp -> 
broker.replicaManager().onlinePartition(tp).isEmpty())),
+                "Replica manager's should have deleted all of this topic's 
partitions");
+
+        // Ensure that logs from all replicas are deleted
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                        topicPartitions.stream().allMatch(tp -> 
broker.logManager().getLog(tp, false).isEmpty())),
+                "Replica logs not deleted after delete topic is complete");
+
+        // Ensure that the topic is removed from all cleaner offsets
+        TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker ->
+                        topicPartitions.stream().allMatch(tp -> {
+                            List<File> liveLogDirs = 
CollectionConverters.asJava(broker.logManager().liveLogDirs());

Review Comment:
   please fix the indent 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to