artemlivshits commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1495047191
########## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java: ########## @@ -36,28 +38,38 @@ public class DescribeTopicsResult { private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures; private final Map<String, KafkaFuture<TopicDescription>> nameFutures; + private final Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> nameFuturesIterator; @Deprecated protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures) { - this(null, futures); + this(null, futures, null); } // VisibleForTesting - protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) { - if (topicIdFutures != null && nameFutures != null) - throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); - if (topicIdFutures == null && nameFutures == null) - throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); + protected DescribeTopicsResult( + Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures, + Map<String, KafkaFuture<TopicDescription>> nameFutures, + Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> nameFuturesIterator + ) { + if (topicIdFutures != null && nameFutures != null && nameFuturesIterator != null) + throw new IllegalArgumentException("topicIdFutures and nameFutures and nameFutureIterator cannot both be specified."); Review Comment: nit: using "both" implies that there are 2 things, we now have 3. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); - Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { - private boolean supportsDisablingTopicCreation = true; + if (options.useDescribeTopicsApi()) { + RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Didn't quite get how the recurring call framework enables for lazy generation? The RecurringCall.run() just runs the whole thing in one go. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); - Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), - new LeastLoadedNodeProvider()) { - private boolean supportsDisablingTopicCreation = true; + if (options.useDescribeTopicsApi()) { + RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { Review Comment: Looking at the current logic I cannot make sense out of it. For some reason we get all topic names that fit under the batch size and then get all partitions for those topics and put them in memory. Why did we bother with all paging complexity if all we do is to get all partitions for 2000 topics in memory? Can we just iterate topic by topic (as it seems that we get the list of all topics anyway) and just get partitions for each one? With the paging, I would've expected that we did something like this: 1. start iteration -- issue one call, once the first call is completed maybe issue a few more (say 3) calls ahead for pipelining. 2. next -- iterate over current batch. 3. once the current batch is exhausted, switch to the next batch, pipeline one more call. This way if the caller finishes iteration early or if iteration is slow, we won't keep piling results in memory. ########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I } else { ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); } - List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>(); if (!topicIds.isEmpty()) { Map<Uuid, org.apache.kafka.clients.admin.TopicDescription> descTopics = adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); - topicDescriptions = new ArrayList<>(descTopics.values()); + describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); + return; } if (!topics.isEmpty()) { - Map<String, org.apache.kafka.clients.admin.TopicDescription> descTopics = - adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); - topicDescriptions = new ArrayList<>(descTopics.values()); + final int partitionSizeLimit = opts.partitionSizeLimitPerResponse().orElse(2000); + try { + Iterator<Map.Entry<String, KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>> descTopicIterator = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), + new DescribeTopicsOptions().useDescribeTopicsApi(true) + .partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator(); + while (descTopicIterator.hasNext()) { + List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>(); + for (int counter = 0; counter < partitionSizeLimit && descTopicIterator.hasNext();) { + org.apache.kafka.clients.admin.TopicDescription topicDescription = descTopicIterator.next().getValue().get(); + topicDescriptions.add(topicDescription); + counter += topicDescription.partitions().size(); + } + describeTopicsFollowUp(topicDescriptions, opts); + } + } catch (Exception e) { + if (e.toString().contains("UnsupportedVersionException")) { + // Retry the request with Metadata API. + Map<String, org.apache.kafka.clients.admin.TopicDescription> descTopics = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), + new DescribeTopicsOptions().useDescribeTopicsApi(false) Review Comment: Does the partition size limit option get respected in this case? ########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I } else { ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); } - List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>(); if (!topicIds.isEmpty()) { Map<Uuid, org.apache.kafka.clients.admin.TopicDescription> descTopics = adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); - topicDescriptions = new ArrayList<>(descTopics.values()); + describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); + return; } if (!topics.isEmpty()) { - Map<String, org.apache.kafka.clients.admin.TopicDescription> descTopics = - adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); - topicDescriptions = new ArrayList<>(descTopics.values()); + final int partitionSizeLimit = opts.partitionSizeLimitPerResponse().orElse(2000); + try { + Iterator<Map.Entry<String, KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>> descTopicIterator = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), + new DescribeTopicsOptions().useDescribeTopicsApi(true) + .partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator(); + while (descTopicIterator.hasNext()) { + List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>(); + for (int counter = 0; counter < partitionSizeLimit && descTopicIterator.hasNext();) { + org.apache.kafka.clients.admin.TopicDescription topicDescription = descTopicIterator.next().getValue().get(); + topicDescriptions.add(topicDescription); + counter += topicDescription.partitions().size(); + } + describeTopicsFollowUp(topicDescriptions, opts); Review Comment: Will this properly handle cases when a topic with partition 0-42 is printed in one iteration and then the same topic with partitions 43-77 is printed in the next iteration? ########## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java: ########## @@ -36,28 +38,38 @@ public class DescribeTopicsResult { private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures; private final Map<String, KafkaFuture<TopicDescription>> nameFutures; + private final Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> nameFuturesIterator; @Deprecated protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures) { - this(null, futures); + this(null, futures, null); } // VisibleForTesting - protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) { - if (topicIdFutures != null && nameFutures != null) - throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be specified."); - if (topicIdFutures == null && nameFutures == null) - throw new IllegalArgumentException("topicIdFutures and nameFutures cannot both be null."); + protected DescribeTopicsResult( + Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures, + Map<String, KafkaFuture<TopicDescription>> nameFutures, + Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>> nameFuturesIterator + ) { + if (topicIdFutures != null && nameFutures != null && nameFuturesIterator != null) Review Comment: If I understand the intent of this check correctly, it wants to check that exactly one of the 3 is specified. This check would let any 2 out of 3 to be specified. ########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I } else { ensureTopicExists(topics, opts.topic().orElse(""), !opts.ifExists()); } - List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>(); if (!topicIds.isEmpty()) { Map<Uuid, org.apache.kafka.clients.admin.TopicDescription> descTopics = adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get(); - topicDescriptions = new ArrayList<>(descTopics.values()); + describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); + return; } if (!topics.isEmpty()) { - Map<String, org.apache.kafka.clients.admin.TopicDescription> descTopics = - adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get(); - topicDescriptions = new ArrayList<>(descTopics.values()); + final int partitionSizeLimit = opts.partitionSizeLimitPerResponse().orElse(2000); + try { + Iterator<Map.Entry<String, KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>> descTopicIterator = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), + new DescribeTopicsOptions().useDescribeTopicsApi(true) + .partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator(); + while (descTopicIterator.hasNext()) { + List<org.apache.kafka.clients.admin.TopicDescription> topicDescriptions = new ArrayList<>(); + for (int counter = 0; counter < partitionSizeLimit && descTopicIterator.hasNext();) { + org.apache.kafka.clients.admin.TopicDescription topicDescription = descTopicIterator.next().getValue().get(); + topicDescriptions.add(topicDescription); + counter += topicDescription.partitions().size(); + } + describeTopicsFollowUp(topicDescriptions, opts); + } + } catch (Exception e) { + if (e.toString().contains("UnsupportedVersionException")) { + // Retry the request with Metadata API. + Map<String, org.apache.kafka.clients.admin.TopicDescription> descTopics = + adminClient.describeTopics(TopicCollection.ofTopicNames(topics), + new DescribeTopicsOptions().useDescribeTopicsApi(false) + .partitionSizeLimitPerResponse(opts.partitionSizeLimitPerResponse().orElse(2000))).allTopicNames().get(); + describeTopicsFollowUp(new ArrayList<>(descTopics.values()), opts); + return; + } else { + throw e; + } + } + return; } + describeTopicsFollowUp(Collections.emptyList(), opts); + } + + private void describeTopicsFollowUp( Review Comment: Looks like this function prints the the topic descriptions. Should we name it to describe its purpose? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org