artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1506982605
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) { return partitionInfo.leader(); } + @Override + public void describeTopics( + TopicCollection topics, + DescribeTopicsOptions options, + AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) { + if (topics instanceof TopicIdCollection) { + subscriber.onError( + new IllegalArgumentException("Currently the describeTopics subscription mode does not support topic IDs.") + ); + return; + } + if (!(topics instanceof TopicNameCollection)) { + subscriber.onError( + new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics.") + ); + return; + } + + TreeSet<String> topicNames = new TreeSet<>(); + ((TopicNameCollection) topics).topicNames().forEach(topicName -> { + if (topicNameIsUnrepresentable(topicName)) { + subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request."))); + } else { + topicNames.add(topicName); + } + }); + + RecurringCall call = new RecurringCall( Review Comment: I'm still not sure why we need the RecurringCall, I think something like this should be much less code: ``` ArrayBlockingQueue<DescribeTopicPartitionsResult> results = new ArrayBlockingQueue<>(5); Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { DescribeTopicPartitionsRequestData.Cursor currentCursor = new DescribeTopicPartitionsRequestData.Cursor(); // <...> @Override void handleResponse(AbstractResponse abstractResponse) { // ... Do the needful ... results.put(...); // ... if (hasMore) { // ... Set new cursor ... // ... runnable.call(this, now); } else { results.put(null); } } // <...> } runnable.call(call, time.milliseconds()); while (true) { DescribeTopicPartitionsResult result = results.take(); if (result == null) break; subscriber.onNext(result); } ``` And we won't need to create extra threads on the TopicCommand and etc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org