artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1495047191
##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java:
##########
@@ -36,28 +38,38 @@
public class DescribeTopicsResult {
private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures;
private final Map<String, KafkaFuture<TopicDescription>> nameFutures;
+ private final Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>>
nameFuturesIterator;
@Deprecated
protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>>
futures) {
- this(null, futures);
+ this(null, futures, null);
}
// VisibleForTesting
- protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>>
topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) {
- if (topicIdFutures != null && nameFutures != null)
- throw new IllegalArgumentException("topicIdFutures and nameFutures
cannot both be specified.");
- if (topicIdFutures == null && nameFutures == null)
- throw new IllegalArgumentException("topicIdFutures and nameFutures
cannot both be null.");
+ protected DescribeTopicsResult(
+ Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures,
+ Map<String, KafkaFuture<TopicDescription>> nameFutures,
+ Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>>
nameFuturesIterator
+ ) {
+ if (topicIdFutures != null && nameFutures != null &&
nameFuturesIterator != null)
+ throw new IllegalArgumentException("topicIdFutures and nameFutures
and nameFutureIterator cannot both be specified.");
Review Comment:
nit: using "both" implies that there are 2 things, we now have 3.
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>>
handleDescribeTopicsByNames(f
}
}
final long now = time.milliseconds();
- Call call = new Call("describeTopics", calcDeadlineMs(now,
options.timeoutMs()),
- new LeastLoadedNodeProvider()) {
- private boolean supportsDisablingTopicCreation = true;
+ if (options.useDescribeTopicsApi()) {
+ RecurringCall call = new RecurringCall("DescribeTopics-Recurring",
calcDeadlineMs(now, options.timeoutMs()), runnable) {
Review Comment:
Didn't quite get how the recurring call framework enables for lazy
generation? The RecurringCall.run() just runs the whole thing in one go.
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2129,63 +2167,183 @@ private Map<String, KafkaFuture<TopicDescription>>
handleDescribeTopicsByNames(f
}
}
final long now = time.milliseconds();
- Call call = new Call("describeTopics", calcDeadlineMs(now,
options.timeoutMs()),
- new LeastLoadedNodeProvider()) {
- private boolean supportsDisablingTopicCreation = true;
+ if (options.useDescribeTopicsApi()) {
+ RecurringCall call = new RecurringCall("DescribeTopics-Recurring",
calcDeadlineMs(now, options.timeoutMs()), runnable) {
Review Comment:
Looking at the current logic I cannot make sense out of it. For some reason
we get all topic names that fit under the batch size and then get all
partitions for those topics and put them in memory.
Why did we bother with all paging complexity if all we do is to get all
partitions for 2000 topics in memory? Can we just iterate topic by topic (as
it seems that we get the list of all topics anyway) and just get partitions for
each one?
With the paging, I would've expected that we did something like this:
1. start iteration -- issue one call, once the first call is completed maybe
issue a few more (say 3) calls ahead for pipelining.
2. next -- iterate over current batch.
3. once the current batch is exhausted, switch to the next batch, pipeline
one more call.
This way if the caller finishes iteration early or if iteration is slow, we
won't keep piling results in memory.
##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts)
throws ExecutionException, I
} else {
ensureTopicExists(topics, opts.topic().orElse(""),
!opts.ifExists());
}
- List<org.apache.kafka.clients.admin.TopicDescription>
topicDescriptions = new ArrayList<>();
if (!topicIds.isEmpty()) {
Map<Uuid, org.apache.kafka.clients.admin.TopicDescription>
descTopics =
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
- topicDescriptions = new ArrayList<>(descTopics.values());
+ describeTopicsFollowUp(new ArrayList<>(descTopics.values()),
opts);
+ return;
}
if (!topics.isEmpty()) {
- Map<String, org.apache.kafka.clients.admin.TopicDescription>
descTopics =
-
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
- topicDescriptions = new ArrayList<>(descTopics.values());
+ final int partitionSizeLimit =
opts.partitionSizeLimitPerResponse().orElse(2000);
+ try {
+ Iterator<Map.Entry<String,
KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>>
descTopicIterator =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+ new
DescribeTopicsOptions().useDescribeTopicsApi(true)
+
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+ while (descTopicIterator.hasNext()) {
+ List<org.apache.kafka.clients.admin.TopicDescription>
topicDescriptions = new ArrayList<>();
+ for (int counter = 0; counter < partitionSizeLimit &&
descTopicIterator.hasNext();) {
+ org.apache.kafka.clients.admin.TopicDescription
topicDescription = descTopicIterator.next().getValue().get();
+ topicDescriptions.add(topicDescription);
+ counter += topicDescription.partitions().size();
+ }
+ describeTopicsFollowUp(topicDescriptions, opts);
+ }
+ } catch (Exception e) {
+ if (e.toString().contains("UnsupportedVersionException")) {
+ // Retry the request with Metadata API.
+ Map<String,
org.apache.kafka.clients.admin.TopicDescription> descTopics =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+ new
DescribeTopicsOptions().useDescribeTopicsApi(false)
Review Comment:
Does the partition size limit option get respected in this case?
##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts)
throws ExecutionException, I
} else {
ensureTopicExists(topics, opts.topic().orElse(""),
!opts.ifExists());
}
- List<org.apache.kafka.clients.admin.TopicDescription>
topicDescriptions = new ArrayList<>();
if (!topicIds.isEmpty()) {
Map<Uuid, org.apache.kafka.clients.admin.TopicDescription>
descTopics =
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
- topicDescriptions = new ArrayList<>(descTopics.values());
+ describeTopicsFollowUp(new ArrayList<>(descTopics.values()),
opts);
+ return;
}
if (!topics.isEmpty()) {
- Map<String, org.apache.kafka.clients.admin.TopicDescription>
descTopics =
-
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
- topicDescriptions = new ArrayList<>(descTopics.values());
+ final int partitionSizeLimit =
opts.partitionSizeLimitPerResponse().orElse(2000);
+ try {
+ Iterator<Map.Entry<String,
KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>>
descTopicIterator =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+ new
DescribeTopicsOptions().useDescribeTopicsApi(true)
+
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+ while (descTopicIterator.hasNext()) {
+ List<org.apache.kafka.clients.admin.TopicDescription>
topicDescriptions = new ArrayList<>();
+ for (int counter = 0; counter < partitionSizeLimit &&
descTopicIterator.hasNext();) {
+ org.apache.kafka.clients.admin.TopicDescription
topicDescription = descTopicIterator.next().getValue().get();
+ topicDescriptions.add(topicDescription);
+ counter += topicDescription.partitions().size();
+ }
+ describeTopicsFollowUp(topicDescriptions, opts);
Review Comment:
Will this properly handle cases when a topic with partition 0-42 is printed
in one iteration and then the same topic with partitions 43-77 is printed in
the next iteration?
##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java:
##########
@@ -36,28 +38,38 @@
public class DescribeTopicsResult {
private final Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures;
private final Map<String, KafkaFuture<TopicDescription>> nameFutures;
+ private final Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>>
nameFuturesIterator;
@Deprecated
protected DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>>
futures) {
- this(null, futures);
+ this(null, futures, null);
}
// VisibleForTesting
- protected DescribeTopicsResult(Map<Uuid, KafkaFuture<TopicDescription>>
topicIdFutures, Map<String, KafkaFuture<TopicDescription>> nameFutures) {
- if (topicIdFutures != null && nameFutures != null)
- throw new IllegalArgumentException("topicIdFutures and nameFutures
cannot both be specified.");
- if (topicIdFutures == null && nameFutures == null)
- throw new IllegalArgumentException("topicIdFutures and nameFutures
cannot both be null.");
+ protected DescribeTopicsResult(
+ Map<Uuid, KafkaFuture<TopicDescription>> topicIdFutures,
+ Map<String, KafkaFuture<TopicDescription>> nameFutures,
+ Iterator<Map.Entry<String, KafkaFuture<TopicDescription>>>
nameFuturesIterator
+ ) {
+ if (topicIdFutures != null && nameFutures != null &&
nameFuturesIterator != null)
Review Comment:
If I understand the intent of this check correctly, it wants to check that
exactly one of the 3 is specified. This check would let any 2 out of 3 to be
specified.
##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts)
throws ExecutionException, I
} else {
ensureTopicExists(topics, opts.topic().orElse(""),
!opts.ifExists());
}
- List<org.apache.kafka.clients.admin.TopicDescription>
topicDescriptions = new ArrayList<>();
if (!topicIds.isEmpty()) {
Map<Uuid, org.apache.kafka.clients.admin.TopicDescription>
descTopics =
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
- topicDescriptions = new ArrayList<>(descTopics.values());
+ describeTopicsFollowUp(new ArrayList<>(descTopics.values()),
opts);
+ return;
}
if (!topics.isEmpty()) {
- Map<String, org.apache.kafka.clients.admin.TopicDescription>
descTopics =
-
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
- topicDescriptions = new ArrayList<>(descTopics.values());
+ final int partitionSizeLimit =
opts.partitionSizeLimitPerResponse().orElse(2000);
+ try {
+ Iterator<Map.Entry<String,
KafkaFuture<org.apache.kafka.clients.admin.TopicDescription>>>
descTopicIterator =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+ new
DescribeTopicsOptions().useDescribeTopicsApi(true)
+
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+ while (descTopicIterator.hasNext()) {
+ List<org.apache.kafka.clients.admin.TopicDescription>
topicDescriptions = new ArrayList<>();
+ for (int counter = 0; counter < partitionSizeLimit &&
descTopicIterator.hasNext();) {
+ org.apache.kafka.clients.admin.TopicDescription
topicDescription = descTopicIterator.next().getValue().get();
+ topicDescriptions.add(topicDescription);
+ counter += topicDescription.partitions().size();
+ }
+ describeTopicsFollowUp(topicDescriptions, opts);
+ }
+ } catch (Exception e) {
+ if (e.toString().contains("UnsupportedVersionException")) {
+ // Retry the request with Metadata API.
+ Map<String,
org.apache.kafka.clients.admin.TopicDescription> descTopics =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+ new
DescribeTopicsOptions().useDescribeTopicsApi(false)
+
.partitionSizeLimitPerResponse(opts.partitionSizeLimitPerResponse().orElse(2000))).allTopicNames().get();
+ describeTopicsFollowUp(new
ArrayList<>(descTopics.values()), opts);
+ return;
+ } else {
+ throw e;
+ }
+ }
+ return;
}
+ describeTopicsFollowUp(Collections.emptyList(), opts);
+ }
+
+ private void describeTopicsFollowUp(
Review Comment:
Looks like this function prints the the topic descriptions. Should we name
it to describe its purpose?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]