artemlivshits commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1523702932
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; + } + + @SuppressWarnings({"MethodLength", "NPathComplexity"}) + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( + final Collection<String> topicNames, + DescribeTopicsOptions options + ) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } } + + if (topicNamesList.isEmpty()) { + return new HashMap<>(topicFutures); + } + + // First, we need to retrieve the node info. + DescribeClusterResult clusterResult = describeCluster(); + Map<Integer, Node> nodes; + try { + nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); + } catch (InterruptedException | ExecutionException e) { + completeAllExceptionally(topicFutures.values(), e.getCause()); + return new HashMap<>(topicFutures); + } + + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + TopicDescription partiallyFinishedTopicDescription = null; + TopicDescription nextTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + if (partiallyFinishedTopicDescription != null) { Review Comment: Ideally we should drive the pagination from the cursor, rather than spreading the logic between multiple states. We plan to get rid of topic list eventually and this logic would be easy to miss. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; + } + + @SuppressWarnings({"MethodLength", "NPathComplexity"}) + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( + final Collection<String> topicNames, + DescribeTopicsOptions options + ) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } } + + if (topicNamesList.isEmpty()) { + return new HashMap<>(topicFutures); + } + + // First, we need to retrieve the node info. + DescribeClusterResult clusterResult = describeCluster(); + Map<Integer, Node> nodes; + try { + nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); + } catch (InterruptedException | ExecutionException e) { + completeAllExceptionally(topicFutures.values(), e.getCause()); + return new HashMap<>(topicFutures); + } + + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + TopicDescription partiallyFinishedTopicDescription = null; + TopicDescription nextTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + if (partiallyFinishedTopicDescription != null) { + request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() + .setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) + ); + } + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + pendingTopics.remove(topicName); + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + responseCursor = null; + } + continue; + } + + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + + if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); + continue; + } + + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + // This is the first time this topic being pointed by the cursor. + nextTopicDescription = currentTopicDescription; + continue; + } Review Comment: Ok, sounds good. Can we add a comment that explains what you've just said above? Also seems like nextTopicDescription can be just a local in the function, not a member in the class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org