CalvinConfluent commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; + } + + @SuppressWarnings({"MethodLength", "NPathComplexity"}) + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( + final Collection<String> topicNames, + DescribeTopicsOptions options + ) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } } + + if (topicNamesList.isEmpty()) { + return new HashMap<>(topicFutures); + } + + // First, we need to retrieve the node info. + DescribeClusterResult clusterResult = describeCluster(); + Map<Integer, Node> nodes; + try { + nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); + } catch (InterruptedException | ExecutionException e) { + completeAllExceptionally(topicFutures.values(), e.getCause()); + return new HashMap<>(topicFutures); + } + + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + TopicDescription partiallyFinishedTopicDescription = null; + TopicDescription nextTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + if (partiallyFinishedTopicDescription != null) { + request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() + .setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) + ); + } + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + pendingTopics.remove(topicName); + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + responseCursor = null; + } + continue; + } + + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + + if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); + continue; + } + + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + // This is the first time this topic being pointed by the cursor. + nextTopicDescription = currentTopicDescription; + continue; + } Review Comment: The problem is that in the same batch of results, we can have both some partitions for a topic partially finished and for a new topic pointed by the cursor. If we want to close the future for the partially finished after the loop, we have to store the cursor topic partitions somewhere(nextTopicDescription) during the loop. However, it may be easier if we can close the future during the loop because the API guarantees that the partially finished will be the first in the result, and the cursor topic shows at the end. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org