CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;
+    }
+
+    @SuppressWarnings({"MethodLength", "NPathComplexity"})
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+        final Collection<String> topicNames,
+        DescribeTopicsOptions options
+    ) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
+        for (String topicName : topicNames) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
+                topicNamesList.add(topicName);
+            }
         }
+
+        if (topicNamesList.isEmpty()) {
+            return new HashMap<>(topicFutures);
+        }
+
+        // First, we need to retrieve the node info.
+        DescribeClusterResult clusterResult = describeCluster();
+        Map<Integer, Node> nodes;
+        try {
+            nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+        } catch (InterruptedException | ExecutionException e) {
+            completeAllExceptionally(topicFutures.values(), e.getCause());
+            return new HashMap<>(topicFutures);
+        }
+
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+            Map<String, TopicRequest> pendingTopics =
+                topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+                    .collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+            TopicDescription partiallyFinishedTopicDescription = null;
+            TopicDescription nextTopicDescription = null;
+
+            @Override
+            DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                    .setTopics(new ArrayList<>(pendingTopics.values()))
+                    
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+                if (partiallyFinishedTopicDescription != null) {
+                    request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                        .setTopicName(partiallyFinishedTopicDescription.name())
+                        
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+                    );
+                }
+                return new DescribeTopicPartitionsRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+                for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+                    String topicName = topic.name();
+                    Errors error = Errors.forCode(topic.errorCode());
+
+                    KafkaFutureImpl<TopicDescription> future = 
topicFutures.get(topicName);
+
+                    if (error != Errors.NONE) {
+                        future.completeExceptionally(error.exception());
+                        pendingTopics.remove(topicName);
+                        if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                            responseCursor = null;
+                        }
+                        continue;
+                    }
+
+                    TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+                    if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+                        
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+                        continue;
+                    }
+
+                    if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                        // This is the first time this topic being pointed by 
the cursor.
+                        nextTopicDescription = currentTopicDescription;
+                        continue;
+                    }

Review Comment:
   The problem is that in the same batch of results, we can have both some 
partitions for a topic partially finished and for a new topic pointed by the 
cursor. If we want to close the future for the partially finished after the 
loop, we have to store the cursor topic partitions 
somewhere(nextTopicDescription) during the loop.
   However, it may be easier if we can close the future during the loop because 
the API guarantees that the partially finished will be the first in the result, 
and the cursor topic shows at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to