chia7712 commented on code in PR #20108: URL: https://github.com/apache/kafka/pull/20108#discussion_r2191180839
########## test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java: ########## @@ -258,7 +257,63 @@ default Set<GroupProtocol> supportedGroupProtocols() { //---------------------------[wait]---------------------------// default void waitTopicDeletion(String topic) throws InterruptedException { - waitForTopic(topic, 0); + Collection<KafkaBroker> brokers = aliveBrokers().values(); + // wait for metadata + TestUtils.waitForCondition( + () -> brokers.stream().allMatch( + broker -> broker.metadataCache().numPartitions(topic).isEmpty()), + 60000L, topic + " metadata not propagated after 60000 ms"); + + for (ControllerServer controller : controllers().values()) { + long controllerOffset = controller.raftManager().replicatedLog().endOffset().offset() - 1; + TestUtils.waitForCondition( + () -> brokers.stream().allMatch(broker -> ((BrokerServer) broker).sharedServer().loader().lastAppliedOffset() >= controllerOffset), + 60000L, "Timeout waiting for controller metadata propagating to brokers"); + } + + List<TopicPartition> topicPartitions = List.of(new TopicPartition(topic, 0)); + + // Ensure that the topic-partition has been deleted from all brokers' replica managers + TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> + topicPartitions.stream().allMatch(tp -> broker.replicaManager().onlinePartition(tp).isEmpty())), + "Replica manager's should have deleted all of this topic's partitions"); + + // Ensure that logs from all replicas are deleted + TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> + topicPartitions.stream().allMatch(tp -> broker.logManager().getLog(tp, false).isEmpty())), + "Replica logs not deleted after delete topic is complete"); + + // Ensure that the topic is removed from all cleaner offsets + TestUtils.waitForCondition(() -> brokers.stream().allMatch(broker -> + topicPartitions.stream().allMatch(tp -> { + List<File> liveLogDirs = CollectionConverters.asJava(broker.logManager().liveLogDirs()); Review Comment: please fix the indent -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org