artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1495047191


##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java:
##########
@@ -36,28 +38,38 @@
 public class DescribeTopicsResult {
     private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures;
     private final Map<String, KafkaFuture<TopicDescription>> nameFutures;
+    private final Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> 
nameFuturesIterator;
 
     @Deprecated
     protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> 
futures) {
-        this(null, futures);
+        this(null, futures, null);
     }
 
     // VisibleForTesting
-    protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>> 
topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) {
-        if (topicIdFutures != null && nameFutures != null)
-            throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
-        if (topicIdFutures == null && nameFutures == null)
-            throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+    protected DescribeTopicsResult(
+        Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures,
+        Map<String, KafkaFuture<TopicDescription>> nameFutures,
+        Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> 
nameFuturesIterator
+    ) {
+        if (topicIdFutures != null && nameFutures != null && 
nameFuturesIterator != null)
+            throw new IllegalArgumentException("topicIdFutures and nameFutures 
and nameFutureIterator cannot both be specified.");

Review Comment:
   nit: using "both" implies that there are 2 things, we now have 3.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNames(f
             }
         }
         final long now = time.milliseconds();
-        Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-            new LeastLoadedNodeProvider()) {
 
-            private boolean supportsDisablingTopicCreation = true;
+        if (options.useDescribeTopicsApi()) {
+            RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Didn't quite get how the recurring call framework enables for lazy 
generation?  The RecurringCall.run() just runs the whole thing in one go.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNames(f
             }
         }
         final long now = time.milliseconds();
-        Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-            new LeastLoadedNodeProvider()) {
 
-            private boolean supportsDisablingTopicCreation = true;
+        if (options.useDescribeTopicsApi()) {
+            RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Looking at the current logic I cannot make sense out of it.  For some reason 
we get all topic names that fit under the batch size and then get all 
partitions for those topics and put them in memory.
   
   Why did we bother with all paging complexity if all we do is to get all 
partitions for 2000 topics in memory?  Can we just iterate topic by topic (as 
it seems that we get the list of all topics anyway) and just get partitions for 
each one?
   
   With the paging, I would've expected that we did something like this:
   
   1. start iteration -- issue one call, once the first call is completed maybe 
issue a few more (say 3) calls ahead for pipelining.
   2. next  -- iterate over current batch.
   3. once the current batch is exhausted, switch to the next batch, pipeline 
one more call.
   
   This way if the caller finishes iteration early or if iteration is slow, we 
won't keep piling results in memory.



##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) 
throws ExecutionException, I
             } else {
                 ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
             }
-            List<org.apache.kafka.clients.admin.TopicDescription> 
topicDescriptions = new ArrayList<>();
 
             if (!topicIds.isEmpty()) {
                 Map<Uuid, org.apache.kafka.clients.admin.TopicDescription> 
descTopics =
                     
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
-                topicDescriptions = new ArrayList<>(descTopics.values());
+                describeTopicsFollowUp(new ArrayList<>(descTopics.values()), 
opts);
+                return;
             }
 
             if (!topics.isEmpty()) {
-                Map<String, org.apache.kafka.clients.admin.TopicDescription> 
descTopics =
-                    
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
-                topicDescriptions = new ArrayList<>(descTopics.values());
+                final int partitionSizeLimit = 
opts.partitionSizeLimitPerResponse().orElse(2000);
+                try {
+                    Iterator<Map.Entry<String, 
KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>> 
descTopicIterator =
+                        
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+                            new 
DescribeTopicsOptions().useDescribeTopicsApi(true)
+                                
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+                    while (descTopicIterator.hasNext()) {
+                        List<org.apache.kafka.clients.admin.TopicDescription> 
topicDescriptions = new ArrayList<>();
+                        for (int counter = 0; counter < partitionSizeLimit && 
descTopicIterator.hasNext();) {
+                            org.apache.kafka.clients.admin.TopicDescription 
topicDescription = descTopicIterator.next().getValue().get();
+                            topicDescriptions.add(topicDescription);
+                            counter += topicDescription.partitions().size();
+                        }
+                        describeTopicsFollowUp(topicDescriptions, opts);
+                    }
+                } catch (Exception e) {
+                    if (e.toString().contains("UnsupportedVersionException")) {
+                        // Retry the request with Metadata API.
+                        Map<String, 
org.apache.kafka.clients.admin.TopicDescription> descTopics =
+                            
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+                                new 
DescribeTopicsOptions().useDescribeTopicsApi(false)

Review Comment:
   Does the partition size limit option get respected in this case?



##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) 
throws ExecutionException, I
             } else {
                 ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
             }
-            List<org.apache.kafka.clients.admin.TopicDescription> 
topicDescriptions = new ArrayList<>();
 
             if (!topicIds.isEmpty()) {
                 Map<Uuid, org.apache.kafka.clients.admin.TopicDescription> 
descTopics =
                     
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
-                topicDescriptions = new ArrayList<>(descTopics.values());
+                describeTopicsFollowUp(new ArrayList<>(descTopics.values()), 
opts);
+                return;
             }
 
             if (!topics.isEmpty()) {
-                Map<String, org.apache.kafka.clients.admin.TopicDescription> 
descTopics =
-                    
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
-                topicDescriptions = new ArrayList<>(descTopics.values());
+                final int partitionSizeLimit = 
opts.partitionSizeLimitPerResponse().orElse(2000);
+                try {
+                    Iterator<Map.Entry<String, 
KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>> 
descTopicIterator =
+                        
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+                            new 
DescribeTopicsOptions().useDescribeTopicsApi(true)
+                                
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+                    while (descTopicIterator.hasNext()) {
+                        List<org.apache.kafka.clients.admin.TopicDescription> 
topicDescriptions = new ArrayList<>();
+                        for (int counter = 0; counter < partitionSizeLimit && 
descTopicIterator.hasNext();) {
+                            org.apache.kafka.clients.admin.TopicDescription 
topicDescription = descTopicIterator.next().getValue().get();
+                            topicDescriptions.add(topicDescription);
+                            counter += topicDescription.partitions().size();
+                        }
+                        describeTopicsFollowUp(topicDescriptions, opts);

Review Comment:
   Will this properly handle cases when a topic with partition 0-42 is printed 
in one iteration and then the same topic with partitions 43-77 is printed in 
the next iteration?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java:
##########
@@ -36,28 +38,38 @@
 public class DescribeTopicsResult {
     private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures;
     private final Map<String, KafkaFuture<TopicDescription>> nameFutures;
+    private final Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> 
nameFuturesIterator;
 
     @Deprecated
     protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> 
futures) {
-        this(null, futures);
+        this(null, futures, null);
     }
 
     // VisibleForTesting
-    protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>> 
topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) {
-        if (topicIdFutures != null && nameFutures != null)
-            throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
-        if (topicIdFutures == null && nameFutures == null)
-            throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+    protected DescribeTopicsResult(
+        Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures,
+        Map<String, KafkaFuture<TopicDescription>> nameFutures,
+        Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> 
nameFuturesIterator
+    ) {
+        if (topicIdFutures != null && nameFutures != null && 
nameFuturesIterator != null)

Review Comment:
   If I understand the intent of this check correctly, it wants to check that 
exactly one of the 3 is specified. This check would let any 2 out of 3 to be 
specified.



##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) 
throws ExecutionException, I
             } else {
                 ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
             }
-            List<org.apache.kafka.clients.admin.TopicDescription> 
topicDescriptions = new ArrayList<>();
 
             if (!topicIds.isEmpty()) {
                 Map<Uuid, org.apache.kafka.clients.admin.TopicDescription> 
descTopics =
                     
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
-                topicDescriptions = new ArrayList<>(descTopics.values());
+                describeTopicsFollowUp(new ArrayList<>(descTopics.values()), 
opts);
+                return;
             }
 
             if (!topics.isEmpty()) {
-                Map<String, org.apache.kafka.clients.admin.TopicDescription> 
descTopics =
-                    
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
-                topicDescriptions = new ArrayList<>(descTopics.values());
+                final int partitionSizeLimit = 
opts.partitionSizeLimitPerResponse().orElse(2000);
+                try {
+                    Iterator<Map.Entry<String, 
KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>> 
descTopicIterator =
+                        
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+                            new 
DescribeTopicsOptions().useDescribeTopicsApi(true)
+                                
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+                    while (descTopicIterator.hasNext()) {
+                        List<org.apache.kafka.clients.admin.TopicDescription> 
topicDescriptions = new ArrayList<>();
+                        for (int counter = 0; counter < partitionSizeLimit && 
descTopicIterator.hasNext();) {
+                            org.apache.kafka.clients.admin.TopicDescription 
topicDescription = descTopicIterator.next().getValue().get();
+                            topicDescriptions.add(topicDescription);
+                            counter += topicDescription.partitions().size();
+                        }
+                        describeTopicsFollowUp(topicDescriptions, opts);
+                    }
+                } catch (Exception e) {
+                    if (e.toString().contains("UnsupportedVersionException")) {
+                        // Retry the request with Metadata API.
+                        Map<String, 
org.apache.kafka.clients.admin.TopicDescription> descTopics =
+                            
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+                                new 
DescribeTopicsOptions().useDescribeTopicsApi(false)
+                                    
.partitionSizeLimitPerResponse(opts.partitionSizeLimitPerResponse().orElse(2000))).allTopicNames().get();
+                        describeTopicsFollowUp(new 
ArrayList<>(descTopics.values()), opts);
+                        return;
+                    } else {
+                        throw e;
+                    }
+                }
+                return;
             }
 
+            describeTopicsFollowUp(Collections.emptyList(), opts);
+        }
+
+        private void describeTopicsFollowUp(

Review Comment:
   Looks like this function prints the the topic descriptions.  Should we name 
it to describe its purpose?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to