artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520443872


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;
+    }
+
+    @SuppressWarnings({"MethodLength", "NPathComplexity"})
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+        final Collection<String> topicNames,
+        DescribeTopicsOptions options
+    ) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
+        for (String topicName : topicNames) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
+                topicNamesList.add(topicName);
+            }
         }
+
+        if (topicNamesList.isEmpty()) {
+            return new HashMap<>(topicFutures);
+        }
+
+        // First, we need to retrieve the node info.
+        DescribeClusterResult clusterResult = describeCluster();
+        Map<Integer, Node> nodes;
+        try {
+            nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+        } catch (InterruptedException | ExecutionException e) {
+            completeAllExceptionally(topicFutures.values(), e.getCause());
+            return new HashMap<>(topicFutures);
+        }
+
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+            Map<String, TopicRequest> pendingTopics =
+                topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+                    .collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+            TopicDescription partiallyFinishedTopicDescription = null;
+            TopicDescription nextTopicDescription = null;
+
+            @Override
+            DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                    .setTopics(new ArrayList<>(pendingTopics.values()))
+                    
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+                if (partiallyFinishedTopicDescription != null) {
+                    request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                        .setTopicName(partiallyFinishedTopicDescription.name())
+                        
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+                    );
+                }
+                return new DescribeTopicPartitionsRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+                for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+                    String topicName = topic.name();
+                    Errors error = Errors.forCode(topic.errorCode());
+
+                    KafkaFutureImpl<TopicDescription> future = 
topicFutures.get(topicName);
+
+                    if (error != Errors.NONE) {
+                        future.completeExceptionally(error.exception());
+                        pendingTopics.remove(topicName);
+                        if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                            responseCursor = null;
+                        }
+                        continue;
+                    }
+
+                    TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+                    if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+                        
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+                        continue;
+                    }
+
+                    if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                        // This is the first time this topic being pointed by 
the cursor.
+                        nextTopicDescription = currentTopicDescription;
+                        continue;
+                    }

Review Comment:
   why not just 
   ```
     if (partiallyFinishedTopicDescription != null) {
       if (partiallyFinishedTopicDescription.name().equals(topicName)) {
         // We've got unfinished description from previous invocation.
         
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
         continue;
       }
     } else {
       if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
         // This topic is part of the next cursor, so it may not have all 
partitions.
         partiallyFinishedTopicDescription = currentTopicDescription;
         continue;
       }
     }
   
   ```
   
   then do we need the nextTopicDescription?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;
+    }
+
+    @SuppressWarnings({"MethodLength", "NPathComplexity"})
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+        final Collection<String> topicNames,
+        DescribeTopicsOptions options
+    ) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
+        for (String topicName : topicNames) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
+                topicNamesList.add(topicName);
+            }
         }
+
+        if (topicNamesList.isEmpty()) {
+            return new HashMap<>(topicFutures);
+        }
+
+        // First, we need to retrieve the node info.
+        DescribeClusterResult clusterResult = describeCluster();
+        Map<Integer, Node> nodes;
+        try {
+            nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+        } catch (InterruptedException | ExecutionException e) {
+            completeAllExceptionally(topicFutures.values(), e.getCause());
+            return new HashMap<>(topicFutures);
+        }
+
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+            Map<String, TopicRequest> pendingTopics =
+                topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+                    .collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+            TopicDescription partiallyFinishedTopicDescription = null;
+            TopicDescription nextTopicDescription = null;
+
+            @Override
+            DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                    .setTopics(new ArrayList<>(pendingTopics.values()))
+                    
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+                if (partiallyFinishedTopicDescription != null) {

Review Comment:
   Does this work when the cursor points to partition 0 of topic that's next to 
the results?  Can we add a unit test to test specifically for the case of 
partition 0?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to