mumrah commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1489713255
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); - Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { - private boolean supportsDisablingTopicCreation = true; + if (options.useDescribeTopicsApi()) { + RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { + Map<String, TopicRequest> pendingTopics = + topicNames.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new) + ); - @Override - MetadataRequest.Builder createRequest(int timeoutMs) { - if (supportsDisablingTopicCreation) - return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertToMetadataRequestTopic(topicNamesList)) - .setAllowAutoTopicCreation(false) - .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); - else - return MetadataRequest.Builder.allTopics(); - } + String partiallyFinishedTopicName = ""; + int partiallyFinishedTopicNextPartitionId = -1; + TopicDescription partiallyFinishedTopicDescription = null; - @Override - void handleResponse(AbstractResponse abstractResponse) { - MetadataResponse response = (MetadataResponse) abstractResponse; - // Handle server responses for particular topics. - Cluster cluster = response.buildCluster(); - Map<String, Errors> errors = response.errors(); - for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) { - String topicName = entry.getKey(); - KafkaFutureImpl<TopicDescription> future = entry.getValue(); - Errors topicError = errors.get(topicName); - if (topicError != null) { - future.completeExceptionally(topicError.exception()); - continue; + @Override + Call generateCall() { + return new Call("describeTopics", this.deadlineMs, new LeastLoadedNodeProvider()) { + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(pendingTopics.values().stream().collect(Collectors.toList())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + if (!partiallyFinishedTopicName.isEmpty()) { + request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() + .setTopicName(partiallyFinishedTopicName) + .setPartitionIndex(partiallyFinishedTopicNextPartitionId) + ); + } + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + String cursorTopicName = ""; + int cursorPartitionId = -1; + if (response.data().nextCursor() != null) { + DescribeTopicPartitionsResponseData.Cursor cursor = response.data().nextCursor(); + cursorTopicName = cursor.topicName(); + cursorPartitionId = cursor.partitionIndex(); + } + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + topicFutures.remove(topicName); + pendingTopics.remove(topicName); + if (partiallyFinishedTopicName.equals(topicName)) { + partiallyFinishedTopicName = ""; + partiallyFinishedTopicNextPartitionId = -1; + partiallyFinishedTopicDescription = null; + } + if (cursorTopicName.equals(topicName)) { + cursorTopicName = ""; + cursorPartitionId = -1; + } + continue; + } + + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic); + + if (partiallyFinishedTopicName.equals(topicName)) { + if (partiallyFinishedTopicDescription == null) { + partiallyFinishedTopicDescription = currentTopicDescription; + } else { + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); + } + + if (!cursorTopicName.equals(topicName)) { + pendingTopics.remove(topicName); + future.complete(partiallyFinishedTopicDescription); + partiallyFinishedTopicDescription = null; + } + continue; + } + + if (cursorTopicName.equals(topicName)) { + partiallyFinishedTopicDescription = currentTopicDescription; + continue; + } + + pendingTopics.remove(topicName); + future.complete(currentTopicDescription); + } + partiallyFinishedTopicName = cursorTopicName; + partiallyFinishedTopicNextPartitionId = cursorPartitionId; + if (pendingTopics.isEmpty()) { + handleNonExistingTopics(); + nextRun.complete(false); + } else { + nextRun.complete(true); + } + } + + @Override + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + return false; + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(topicFutures.values(), throwable); + nextRun.completeExceptionally(throwable); + } + }; + } + + void handleNonExistingTopics() { + for (Map.Entry<String, KafkaFutureImpl<TopicDescription>> entry : topicFutures.entrySet()) { + if (!entry.getValue().isDone()) { + entry.getValue().completeExceptionally(new UnknownTopicOrPartitionException("Topic " + entry.getKey() + " not found.")); + } } - if (!cluster.topics().contains(topicName)) { - future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + topicName + " not found.")); - continue; + } + }; + call.run(); Review Comment: KafkaAdminClient is used by custom applications as well as our command line tools. We should make sure we provide an API that works for both. I think the most common use case is just iterating through the results once (like we do in `kafka-topics.sh`). For this use case, pagination as implemented here helps with limiting the request size and associated buffers, but you're right that it doesn't help with the total memory required to hold the full response in the application. To avoid loading everything into memory at once, I can think of a few options: 1) Make the entire map lazily loaded. No calls are issued until `get` is called on one of the `KafkaFuture<TopicDescription>`. Memory usage increases as you traverse deeper into the map. This is a half-solution. 2) An extension to 1, but we empty the KafkaFuture after it is read. This way, the map is never fully populated. This would fix the memory problem, but it breaks the Future contract, and so is probably a bad idea. 3) Define a new KafkaAdminClient method that returns an Iterator or Stream. Option 3 seems best to me. Thoughts @artemlivshits, @CalvinConfluent ? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); - Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { - private boolean supportsDisablingTopicCreation = true; + if (options.useDescribeTopicsApi()) { + RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: I would actually like to make these subsequent calls lazily. A caller may generate a request that would need several pages of results, but might not actually iterate through the results. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org