artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1506982605


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+        if (topics instanceof TopicIdCollection) {
+            subscriber.onError(
+                new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+            );
+            return;
+        }
+        if (!(topics instanceof TopicNameCollection)) {
+            subscriber.onError(
+                new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+            );
+            return;
+        }
+
+        TreeSet<String> topicNames = new TreeSet<>();
+        ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+            if (topicNameIsUnrepresentable(topicName)) {
+                
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request.")));
+            } else {
+                topicNames.add(topicName);
+            }
+        });
+
+        RecurringCall call = new RecurringCall(

Review Comment:
   I'm still not sure why we need the RecurringCall, I think something like 
this should be much less code:
   
   ```
     ArrayBlockingQueue<DescribeTopicPartitionsResult> results = new 
ArrayBlockingQueue<>(5);
     Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()), new LeastLoadedNodeProvider()) {
       DescribeTopicPartitionsRequestData.Cursor currentCursor = new 
DescribeTopicPartitionsRequestData.Cursor();
       // <...>
       @Override
       void handleResponse(AbstractResponse abstractResponse) {
         // ... Do the needful ...
         results.put(...);
         // ...
          if (hasMore) {
            // ... Set new cursor ...
            // ...
            runnable.call(this, now);
          } else {
            results.put(null);
          }
       }
       // <...>
     }
   
     runnable.call(call, time.milliseconds());
     while (true) {
       DescribeTopicPartitionsResult result = results.take();
       if (result == null)
         break;
       subscriber.onNext(result);
     }
   ```
   
   And we won't need to create extra threads on the TopicCommand and etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to