artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1491416954
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); - Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { - private boolean supportsDisablingTopicCreation = true; + if (options.useDescribeTopicsApi()) { + RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { + Map<String, TopicRequest> pendingTopics = + topicNames.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new) + ); - @Override - MetadataRequest.Builder createRequest(int timeoutMs) { - if (supportsDisablingTopicCreation) - return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertToMetadataRequestTopic(topicNamesList)) - .setAllowAutoTopicCreation(false) - .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); - else - return MetadataRequest.Builder.allTopics(); - } + String partiallyFinishedTopicName = ""; + int partiallyFinishedTopicNextPartitionId = -1; + TopicDescription partiallyFinishedTopicDescription = null; - @Override - void handleResponse(AbstractResponse abstractResponse) { - MetadataResponse response = (MetadataResponse) abstractResponse; - // Handle server responses for particular topics. - Cluster cluster = response.buildCluster(); - Map<String, Errors> errors = response.errors(); - for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) { - String topicName = entry.getKey(); - KafkaFutureImpl<TopicDescription> future = entry.getValue(); - Errors topicError = errors.get(topicName); - if (topicError != null) { - future.completeExceptionally(topicError.exception()); - continue; + @Override + Call generateCall() { + return new Call("describeTopics", this.deadlineMs, new LeastLoadedNodeProvider()) { + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(pendingTopics.values().stream().collect(Collectors.toList())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + if (!partiallyFinishedTopicName.isEmpty()) { + request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() + .setTopicName(partiallyFinishedTopicName) + .setPartitionIndex(partiallyFinishedTopicNextPartitionId) + ); + } + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + String cursorTopicName = ""; + int cursorPartitionId = -1; + if (response.data().nextCursor() != null) { + DescribeTopicPartitionsResponseData.Cursor cursor = response.data().nextCursor(); + cursorTopicName = cursor.topicName(); + cursorPartitionId = cursor.partitionIndex(); + } + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + topicFutures.remove(topicName); + pendingTopics.remove(topicName); + if (partiallyFinishedTopicName.equals(topicName)) { + partiallyFinishedTopicName = ""; + partiallyFinishedTopicNextPartitionId = -1; + partiallyFinishedTopicDescription = null; + } + if (cursorTopicName.equals(topicName)) { + cursorTopicName = ""; + cursorPartitionId = -1; + } + continue; + } + + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic); + + if (partiallyFinishedTopicName.equals(topicName)) { + if (partiallyFinishedTopicDescription == null) { + partiallyFinishedTopicDescription = currentTopicDescription; + } else { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); + } + + if (!cursorTopicName.equals(topicName)) { + pendingTopics.remove(topicName); + future.complete(partiallyFinishedTopicDescription); + partiallyFinishedTopicDescription = null; + } + continue; + } + + if (cursorTopicName.equals(topicName)) { + partiallyFinishedTopicDescription = currentTopicDescription; + continue; + } + + pendingTopics.remove(topicName); + future.complete(currentTopicDescription); + } + partiallyFinishedTopicName = cursorTopicName; + partiallyFinishedTopicNextPartitionId = cursorPartitionId; + if (pendingTopics.isEmpty()) { + handleNonExistingTopics(); + nextRun.complete(false); + } else { + nextRun.complete(true); + } + } + + @Override + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + return false; + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(topicFutures.values(), throwable); + nextRun.completeExceptionally(throwable); + } + }; + } + + void handleNonExistingTopics() { + for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) { + if (!entry.getValue().isDone()) { + entry.getValue().completeExceptionally(new UnknownTopicOrPartitionException("Topic " + entry.getKey() + " not found.")); + } } - if (!cluster.topics().contains(topicName)) { - future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + topicName + " not found.")); - continue; + } + }; + call.run(); Review Comment: I agree that option 3 would provide the most intuitive interface, I think 1 & 2 would hide streaming semantics behind an interface that is generally expected to be "in memory". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org