dengziming commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r562394749
##########
File path:
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1822,6 +1829,88 @@ void handleFailure(Throwable throwable) {
return new DescribeTopicsResult(new HashMap<>(topicFutures));
}
+ @Override
+ public DescribeTopicsResultWithIds describeTopicsWithIds(Collection<Uuid>
topicIds, DescribeTopicsOptions options) {
+
+ final Map<Uuid, KafkaFutureImpl<TopicDescription>> topicFutures = new
HashMap<>(topicIds.size());
+ final List<Uuid> topicIdsList = new ArrayList<>();
+ for (Uuid topicId : topicIds) {
+ if (topicIdIsUnrepresentable(topicId)) {
+ KafkaFutureImpl<TopicDescription> future = new
KafkaFutureImpl<>();
+ future.completeExceptionally(new InvalidTopicException("The
given topic id '" +
+ topicId + "' cannot be represented in a request."));
+ topicFutures.put(topicId, future);
+ } else if (!topicFutures.containsKey(topicId)) {
+ topicFutures.put(topicId, new KafkaFutureImpl<>());
+ topicIdsList.add(topicId);
+ }
+ }
+ final long now = time.milliseconds();
+ Call call = new Call("describeTopicsWithIds", calcDeadlineMs(now,
options.timeoutMs()),
+ new LeastLoadedNodeProvider()) {
+
+ @Override
+ MetadataRequest.Builder createRequest(int timeoutMs) {
+ return new MetadataRequest.Builder(new MetadataRequestData()
+
.setTopics(convertTopicIdsToMetadataRequestTopic(topicIdsList))
+ .setAllowAutoTopicCreation(false)
+
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
+ }
+
+ @Override
+ void handleResponse(AbstractResponse abstractResponse) {
+ MetadataResponse response = (MetadataResponse)
abstractResponse;
+ // Handle server responses for particular topics.
+ Cluster cluster = response.cluster();
+ Map<Uuid, Errors> errors = response.errorsByTopicId();
+ for (Map.Entry<Uuid, KafkaFutureImpl<TopicDescription>> entry
: topicFutures.entrySet()) {
+ Uuid topicId = entry.getKey();
+ KafkaFutureImpl<TopicDescription> future =
entry.getValue();
+
+ String topicName = cluster.topicName(topicId);
+ if (topicName == null) {
+ future.completeExceptionally(new
UnknownTopicIdException("TopicId " + topicId + " not found."));
+ continue;
+ }
+ Errors topicError = errors.get(topicId);
+ if (topicError != null) {
+ future.completeExceptionally(topicError.exception());
+ continue;
+ }
+
+ boolean isInternal =
cluster.internalTopics().contains(topicName);
+ List<PartitionInfo> partitionInfos =
cluster.partitionsForTopic(topicName);
+ List<TopicPartitionInfo> partitions = new
ArrayList<>(partitionInfos.size());
+ for (PartitionInfo partitionInfo : partitionInfos) {
+ TopicPartitionInfo topicPartitionInfo = new
TopicPartitionInfo(
+ partitionInfo.partition(),
leader(partitionInfo), Arrays.asList(partitionInfo.replicas()),
+ Arrays.asList(partitionInfo.inSyncReplicas()));
+ partitions.add(topicPartitionInfo);
+ }
+
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
+ TopicDescription topicDescription = new
TopicDescription(topicName, isInternal, partitions,
+
validAclOperations(response.topicAuthorizedOperations(topicName).get()),
topicId);
+ future.complete(topicDescription);
Review comment:
Thank you, I added a getTopicDescriptionFromCluster to do this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]