dengziming commented on a change in pull request #9769: URL: https://github.com/apache/kafka/pull/9769#discussion_r556176649
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -1816,7 +1823,103 @@ void handleFailure(Throwable throwable) { if (!topicNamesList.isEmpty()) { runnable.call(call, now); } - return new DescribeTopicsResult(new HashMap<>(topicFutures)); + return new DescribeTopicsResult<>(new HashMap<>(topicFutures)); + } + + @Override + public DescribeTopicsResult<Uuid> describeTopicsWithIds(Collection<Uuid> topicIds, DescribeTopicsOptions options) { + + final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicIds.size()); + final List<Uuid> topicIdsList = new ArrayList<>(); + for (Uuid topicId : topicIds) { + if (topicIdIsUnrepresentable(topicId)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic id '" + + topicId + "' cannot be represented in a request.")); + topicFutures.put(topicId, future); + } else if (!topicFutures.containsKey(topicId)) { + topicFutures.put(topicId, new KafkaFutureImpl<>()); + topicIdsList.add(topicId); + } + } + final long now = time.milliseconds(); + Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + + private boolean supportsDisablingTopicCreation = true; + + @Override + MetadataRequest.Builder createRequest(int timeoutMs) { + if (supportsDisablingTopicCreation) + return new MetadataRequest.Builder(new MetadataRequestData() + .setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList)) + .setAllowAutoTopicCreation(false) + .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); + else + return MetadataRequest.Builder.allTopics(); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + MetadataResponse response = (MetadataResponse) abstractResponse; + // Handle server responses for particular topics. + Cluster cluster = response.cluster(); + Map<String, Errors> errors = response.errors(); + for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) { + Uuid topicId = entry.getKey(); + KafkaFutureImpl<TopicDescription> future = entry.getValue(); + + String topicName = cluster.topicName(topicId); + if (topicName == null) { + future.completeExceptionally(new UnknownTopicIdException("TopicId " + topicId + " not found.")); + continue; + } + Errors topicError = errors.get(topicName); + if (topicError != null) { + future.completeExceptionally(topicError.exception()); + continue; + } + + boolean isInternal = cluster.internalTopics().contains(topicName); + List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName); + List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size()); + for (PartitionInfo partitionInfo : partitionInfos) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( + partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), + Arrays.asList(partitionInfo.inSyncReplicas())); + partitions.add(topicPartitionInfo); + } + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); + TopicDescription topicDescription = new TopicDescription(topicName, topicId, isInternal, partitions, + validAclOperations(response.topicAuthorizedOperations(topicName).get())); + future.complete(topicDescription); + } + } + + private Node leader(PartitionInfo partitionInfo) { + if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id()) + return null; + return partitionInfo.leader(); + } + + @Override + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + if (supportsDisablingTopicCreation) { + supportsDisablingTopicCreation = false; Review comment: Sorry, just removing those code and using default implement. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org