dengziming commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r562394749



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) {
         return new DescribeTopicsResult(new HashMap<>(topicFutures));
     }
 
+    @Override
+    public DescribeTopicsResultWithIds describeTopicsWithIds(Collection<Uuid> 
topicIds, DescribeTopicsOptions options) {
+
+        final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new 
HashMap<>(topicIds.size());
+        final List<Uuid> topicIdsList = new ArrayList<>();
+        for (Uuid topicId : topicIds) {
+            if (topicIdIsUnrepresentable(topicId)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
+                        topicId + "' cannot be represented in a request."));
+                topicFutures.put(topicId, future);
+            } else if (!topicFutures.containsKey(topicId)) {
+                topicFutures.put(topicId, new KafkaFutureImpl<>());
+                topicIdsList.add(topicId);
+            }
+        }
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                return new MetadataRequest.Builder(new MetadataRequestData()
+                        
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+                        .setAllowAutoTopicCreation(false)
+                        
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) 
abstractResponse;
+                // Handle server responses for particular topics.
+                Cluster cluster = response.cluster();
+                Map<Uuid, Errors> errors = response.errorsByTopicId();
+                for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry 
: topicFutures.entrySet()) {
+                    Uuid topicId = entry.getKey();
+                    KafkaFutureImpl<TopicDescription> future = 
entry.getValue();
+
+                    String topicName = cluster.topicName(topicId);
+                    if (topicName == null) {
+                        future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+                        continue;
+                    }
+                    Errors topicError = errors.get(topicId);
+                    if (topicError != null) {
+                        future.completeExceptionally(topicError.exception());
+                        continue;
+                    }
+
+                    boolean isInternal = 
cluster.internalTopics().contains(topicName);
+                    List<PartitionInfo> partitionInfos = 
cluster.partitionsForTopic(topicName);
+                    List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
+                    for (PartitionInfo partitionInfo : partitionInfos) {
+                        TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(
+                                partitionInfo.partition(), 
leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
+                                Arrays.asList(partitionInfo.inSyncReplicas()));
+                        partitions.add(topicPartitionInfo);
+                    }
+                    
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+                    TopicDescription topicDescription = new 
TopicDescription(topicName, isInternal, partitions,
+                            
validAclOperations(response.topicAuthorizedOperations(topicName).get()), 
topicId);
+                    future.complete(topicDescription);

Review comment:
       Thank you, I added a getTopicDescriptionFromCluster to do this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to