artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1476922434


##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -799,6 +814,13 @@ public TopicCommandOptions(String[] args) {
                 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
             excludeInternalTopicOpt = parser.accepts("exclude-internal",
                 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+            useDescribeTopicsApiOpt = parser.accepts("use-describe-topics-api",

Review Comment:
   Instead of an explicit option can we just try to use describe topics API if 
the broker has it and otherwise fall back to metadata API?  



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNames(f
             }
         }
         final long now = time.milliseconds();
-        Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-            new LeastLoadedNodeProvider()) {
 
-            private boolean supportsDisablingTopicCreation = true;
+        if (options.useDescribeTopicsApi()) {
+            RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {
+                Map<String, TopicRequest> pendingTopics =
+                    topicNames.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+                        .collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)
+                    );
 
-            @Override
-            MetadataRequest.Builder createRequest(int timeoutMs) {
-                if (supportsDisablingTopicCreation)
-                    return new MetadataRequest.Builder(new 
MetadataRequestData()
-                        
.setTopics(convertToMetadataRequestTopic(topicNamesList))
-                        .setAllowAutoTopicCreation(false)
-                        
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
-                else
-                    return MetadataRequest.Builder.allTopics();
-            }
+                String partiallyFinishedTopicName = "";
+                int partiallyFinishedTopicNextPartitionId = -1;
+                TopicDescription partiallyFinishedTopicDescription = null;
 
-            @Override
-            void handleResponse(AbstractResponse abstractResponse) {
-                MetadataResponse response = (MetadataResponse) 
abstractResponse;
-                // Handle server responses for particular topics.
-                Cluster cluster = response.buildCluster();
-                Map<String, Errors> errors = response.errors();
-                for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> 
entry : topicFutures.entrySet()) {
-                    String topicName = entry.getKey();
-                    KafkaFutureImpl<TopicDescription> future = 
entry.getValue();
-                    Errors topicError = errors.get(topicName);
-                    if (topicError != null) {
-                        future.completeExceptionally(topicError.exception());
-                        continue;
+                @Override
+                Call generateCall() {
+                    return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+                        @Override
+                        DescribeTopicPartitionsRequest.Builder 
createRequest(int timeoutMs) {
+                            DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                                
.setTopics(pendingTopics.values().stream().collect(Collectors.toList()))
+                                
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+                            if (!partiallyFinishedTopicName.isEmpty()) {
+                                request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                                    .setTopicName(partiallyFinishedTopicName)
+                                    
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+                                );
+                            }
+                            return new 
DescribeTopicPartitionsRequest.Builder(request);
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) 
{
+                            DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                            String cursorTopicName = "";
+                            int cursorPartitionId = -1;
+                            if (response.data().nextCursor() != null) {
+                                DescribeTopicPartitionsResponseData.Cursor 
cursor = response.data().nextCursor();
+                                cursorTopicName = cursor.topicName();
+                                cursorPartitionId = cursor.partitionIndex();
+                            }
+
+                            for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+                                String topicName = topic.name();
+                                Errors error = 
Errors.forCode(topic.errorCode());
+
+                                KafkaFutureImpl<TopicDescription> future = 
topicFutures.get(topicName);
+                                if (error != Errors.NONE) {
+                                    
future.completeExceptionally(error.exception());
+                                    topicFutures.remove(topicName);
+                                    pendingTopics.remove(topicName);
+                                    if 
(partiallyFinishedTopicName.equals(topicName)) {
+                                        partiallyFinishedTopicName = "";
+                                        partiallyFinishedTopicNextPartitionId 
= -1;
+                                        partiallyFinishedTopicDescription = 
null;
+                                    }
+                                    if (cursorTopicName.equals(topicName)) {
+                                        cursorTopicName = "";
+                                        cursorPartitionId = -1;
+                                    }
+                                    continue;
+                                }
+
+                                TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic);
+
+                                if 
(partiallyFinishedTopicName.equals(topicName)) {
+                                    if (partiallyFinishedTopicDescription == 
null) {
+                                        partiallyFinishedTopicDescription = 
currentTopicDescription;
+                                    } else {
+                                        
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+                                    }
+
+                                    if (!cursorTopicName.equals(topicName)) {
+                                        pendingTopics.remove(topicName);
+                                        
future.complete(partiallyFinishedTopicDescription);
+                                        partiallyFinishedTopicDescription = 
null;
+                                    }
+                                    continue;
+                                }
+
+                                if (cursorTopicName.equals(topicName)) {
+                                    partiallyFinishedTopicDescription = 
currentTopicDescription;
+                                    continue;
+                                }
+
+                                pendingTopics.remove(topicName);
+                                future.complete(currentTopicDescription);
+                            }
+                            partiallyFinishedTopicName = cursorTopicName;
+                            partiallyFinishedTopicNextPartitionId = 
cursorPartitionId;
+                            if (pendingTopics.isEmpty()) {
+                                handleNonExistingTopics();
+                                nextRun.complete(false);
+                            } else {
+                                nextRun.complete(true);
+                            }
+                        }
+
+                        @Override
+                        boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                            return false;
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            completeAllExceptionally(topicFutures.values(), 
throwable);
+                            nextRun.completeExceptionally(throwable);
+                        }
+                    };
+                }
+
+                void handleNonExistingTopics() {
+                    for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> 
entry : topicFutures.entrySet()) {
+                        if (!entry.getValue().isDone()) {
+                            entry.getValue().completeExceptionally(new 
UnknownTopicOrPartitionException("Topic " + entry.getKey() + " not found."));
+                        }
                     }
-                    if (!cluster.topics().contains(topicName)) {
-                        future.completeExceptionally(new 
UnknownTopicOrPartitionException("Topic " + topicName + " not found."));
-                        continue;
+                }
+            };
+            call.run();

Review Comment:
   It's my understanding that the reason we've got into this pagination 
business is because data from all partitions may not fit in memory at the same 
time, so we'd take streaming approach and retrieve data in chunks and then 
print it as arrives, so that we use O(1) memory instead of O(N).  Looking at 
the end-to-end workflow it looks like we still wait for all results to be 
collected in memory, so it doesn't look like we fixed the problem.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNames(f
             }
         }
         final long now = time.milliseconds();
-        Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-            new LeastLoadedNodeProvider()) {
 
-            private boolean supportsDisablingTopicCreation = true;
+        if (options.useDescribeTopicsApi()) {
+            RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Instead of building the recurring call framework, can we just schedule the 
next call recursively?  Something like 
   
   ```
     Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()), new LeastLoadedNodeProvider()) {
       DescribeTopicPartitionsRequestData.Cursor currentCursor = new 
DescribeTopicPartitionsRequestData.Cursor();
       // <...>
       @Override
       void handleResponse(AbstractResponse abstractResponse) {
         // ... Do the needful ...
         // ...
          if (hasMore) {
            // ... Set new cursor ...
            // ...
            runnable.call(this, now);
          }
       }
       // <...>
     }
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to