dengziming commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r556176649



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1816,7 +1823,103 @@ void handleFailure(Throwable throwable) {
         if (!topicNamesList.isEmpty()) {
             runnable.call(call, now);
         }
-        return new DescribeTopicsResult(new HashMap<>(topicFutures));
+        return new DescribeTopicsResult<>(new HashMap<>(topicFutures));
+    }
+
+    @Override
+    public DescribeTopicsResult<Uuid> describeTopicsWithIds(Collection<Uuid> 
topicIds, DescribeTopicsOptions options) {
+
+        final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new 
HashMap<>(topicIds.size());
+        final List<Uuid> topicIdsList = new ArrayList<>();
+        for (Uuid topicId : topicIds) {
+            if (topicIdIsUnrepresentable(topicId)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic id '" +
+                        topicId + "' cannot be represented in a request."));
+                topicFutures.put(topicId, future);
+            } else if (!topicFutures.containsKey(topicId)) {
+                topicFutures.put(topicId, new KafkaFutureImpl<>());
+                topicIdsList.add(topicId);
+            }
+        }
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, 
options.timeoutMs()),
+                new LeastLoadedNodeProvider()) {
+
+            private boolean supportsDisablingTopicCreation = true;
+
+            @Override
+            MetadataRequest.Builder createRequest(int timeoutMs) {
+                if (supportsDisablingTopicCreation)
+                    return new MetadataRequest.Builder(new 
MetadataRequestData()
+                            
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+                            .setAllowAutoTopicCreation(false)
+                            
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+                else
+                    return MetadataRequest.Builder.allTopics();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse response = (MetadataResponse) 
abstractResponse;
+                // Handle server responses for particular topics.
+                Cluster cluster = response.cluster();
+                Map<String, Errors> errors = response.errors();
+                for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry 
: topicFutures.entrySet()) {
+                    Uuid topicId = entry.getKey();
+                    KafkaFutureImpl<TopicDescription> future = 
entry.getValue();
+
+                    String topicName = cluster.topicName(topicId);
+                    if (topicName == null) {
+                        future.completeExceptionally(new 
UnknownTopicIdException("TopicId " + topicId + " not found."));
+                        continue;
+                    }
+                    Errors topicError = errors.get(topicName);
+                    if (topicError != null) {
+                        future.completeExceptionally(topicError.exception());
+                        continue;
+                    }
+
+                    boolean isInternal = 
cluster.internalTopics().contains(topicName);
+                    List<PartitionInfo> partitionInfos = 
cluster.partitionsForTopic(topicName);
+                    List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
+                    for (PartitionInfo partitionInfo : partitionInfos) {
+                        TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(
+                                partitionInfo.partition(), 
leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
+                                Arrays.asList(partitionInfo.inSyncReplicas()));
+                        partitions.add(topicPartitionInfo);
+                    }
+                    
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+                    TopicDescription topicDescription = new 
TopicDescription(topicName, topicId, isInternal, partitions,
+                            
validAclOperations(response.topicAuthorizedOperations(topicName).get()));
+                    future.complete(topicDescription);
+                }
+            }
+
+            private Node leader(PartitionInfo partitionInfo) {
+                if (partitionInfo.leader() == null || 
partitionInfo.leader().id() == Node.noNode().id())
+                    return null;
+                return partitionInfo.leader();
+            }
+
+            @Override
+            boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                if (supportsDisablingTopicCreation) {
+                    supportsDisablingTopicCreation = false;

Review comment:
       Sorry, just removing those code and using default implement.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to