kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1506629098


##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -335,6 +335,21 @@ default DescribeTopicsResult 
describeTopics(TopicCollection topics) {
      */
     DescribeTopicsResult describeTopics(TopicCollection topics, 
DescribeTopicsOptions options);
 
+    /**
+     * Describe some topics in the cluster.
+     *
+     * When using topic IDs, this operation is supported by brokers with 
version 3.1.0 or higher.
+     *
+     * @param topics  The topics to describe.
+     * @param options The options to use when describing the topics.
+     * @param subscriber The subscriber to consumer the results.
+     */
+    default void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+    };

Review Comment:
   Just so I understand, is the default of a no-op intentional?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##########
@@ -30,6 +30,7 @@
 public class DescribeTopicsOptions extends 
AbstractOptions<DescribeTopicsOptions> {
 
     private boolean includeAuthorizedOperations;
+    private int partitionSizeLimitPerResponse;

Review Comment:
   Is there an upper limit (besides Integer.MAX_SIZE 😏)?



##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -367,6 +372,14 @@ public void printDescription() {
                     .map(node -> node.toString())
                     .collect(Collectors.joining(",")));
             }
+
+            System.out.print("\tElr: " + info.eligibleLeaderReplicas().stream()

Review Comment:
   ```suggestion
               System.out.print("\tELRs: " + 
info.eligibleLeaderReplicas().stream()
   ```



##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -367,6 +372,14 @@ public void printDescription() {
                     .map(node -> node.toString())
                     .collect(Collectors.joining(",")));
             }
+
+            System.out.print("\tElr: " + info.eligibleLeaderReplicas().stream()
+                .map(node -> Integer.toString(node.id()))
+                .collect(Collectors.joining(",")));
+            System.out.print("\tLastKnownElr: " + 
info.lastKnownEligibleLeaderReplicas().stream()

Review Comment:
   ```suggestion
               System.out.print("\tLast Known ELRs: " + 
info.lastKnownEligibleLeaderReplicas().stream()
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -335,6 +335,21 @@ default DescribeTopicsResult 
describeTopics(TopicCollection topics) {
      */
     DescribeTopicsResult describeTopics(TopicCollection topics, 
DescribeTopicsOptions options);
 
+    /**
+     * Describe some topics in the cluster.
+     *
+     * When using topic IDs, this operation is supported by brokers with 
version 3.1.0 or higher.

Review Comment:
   What is the behavior when running sub-3.1.0?



##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicPartitionsResult.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Collections;
+
+public class DescribeTopicPartitionsResult {
+    final public TopicDescription topicDescription;
+    final public Exception exception;

Review Comment:
   Have we considered using `Optional<Exception>` to lessen the chance of 
spurious NPEs?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -994,6 +1002,37 @@ public boolean isInternal() {
         }
     }
 
+    abstract class RecurringCall {
+        private final String name;
+        final long deadlineMs;
+        private final AdminClientRunnable runnable;
+        KafkaFutureImpl<Boolean> nextRun;
+        abstract Call generateCall();
+
+        public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+            this.name = name;
+            this.deadlineMs = deadlineMs;
+            this.runnable = runnable;
+        }
+
+        public String toString() {
+            return "RecurringCall(name=" + name + ", deadlineMs=" + deadlineMs 
+ ")";
+        }
+
+        public void run() {
+            try {
+                do {
+                    nextRun = new KafkaFutureImpl<>();
+                    Call call = generateCall();
+                    runnable.call(call, time.milliseconds());
+                } while (nextRun.get());
+            } catch (Exception e) {
+                log.info("Stop the recurring call " + name + " because " + e);
+                e.printStackTrace();

Review Comment:
   ```suggestion
                   log.info("Stopping the recurring call " + name + " due to an 
error", e);
   ```
   
   Feel free to change the wording, but we probably want the stack trace to go 
through the logging subsystem.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+        if (topics instanceof TopicIdCollection) {
+            subscriber.onError(
+                new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+            );
+            return;
+        }
+        if (!(topics instanceof TopicNameCollection)) {

Review Comment:
   ```suggestion
           } else if (!(topics instanceof TopicNameCollection)) {
   ```
   
   ?



##########
clients/src/main/java/org/apache/kafka/clients/admin/AdminResultsSubscriber.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A subscriber interface for querying API results with pagination.
+ */
+public interface AdminResultsSubscriber<T> {
+
+    // Being called when there is no more subscribed content.
+    void onComplete();
+
+    // Being called when the publisher hits unrecoverable errors.
+    void onError(Exception e);
+
+    // The publisher feeds the next result.
+    void onNext(T result);
+
+    // Initiate the subscriber.
+    void run();

Review Comment:
   Is this implying it's a separate thread of execution or something?



##########
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##########
@@ -335,6 +335,21 @@ default DescribeTopicsResult 
describeTopics(TopicCollection topics) {
      */
     DescribeTopicsResult describeTopics(TopicCollection topics, 
DescribeTopicsOptions options);
 
+    /**
+     * Describe some topics in the cluster.
+     *
+     * When using topic IDs, this operation is supported by brokers with 
version 3.1.0 or higher.
+     *
+     * @param topics  The topics to describe.
+     * @param options The options to use when describing the topics.
+     * @param subscriber The subscriber to consumer the results.
+     */
+    default void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+    };

Review Comment:
   ```suggestion
       }
   ```



##########
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##########
@@ -79,9 +95,24 @@ public List<Node> isr() {
         return isr;
     }
 
+    /**
+     * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+     */
+    public List<Node> eligibleLeaderReplicas() {
+        return elr;
+    }
+
+    /**
+     * Return the last known eligible leader replicas of the partition. Note 
that the ordering of the result is unspecified.
+     */
+    public List<Node> lastKnownEligibleLeaderReplicas() {
+        return lastKnownElr;
+    }
+
     public String toString() {
         return "(partition=" + partition + ", leader=" + leader + ", 
replicas=" +
-            Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + 
")";
+            Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
+            ", elr=" +Utils.join(elr, ", ") + ", lastKnownElr=" + 
Utils.join(lastKnownElr, ", ") + ")";

Review Comment:
   Just the nitiest of picks...
   
   ```suggestion
               ", elr=" + Utils.join(elr, ", ") + ", lastKnownElr=" + 
Utils.join(lastKnownElr, ", ") + ")";
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+        if (topics instanceof TopicIdCollection) {
+            subscriber.onError(
+                new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+            );
+            return;
+        }
+        if (!(topics instanceof TopicNameCollection)) {
+            subscriber.onError(
+                new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+            );
+            return;
+        }
+
+        TreeSet<String> topicNames = new TreeSet<>();
+        ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+            if (topicNameIsUnrepresentable(topicName)) {
+                
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request.")));
+            } else {
+                topicNames.add(topicName);
+            }
+        });
+
+        RecurringCall call = new RecurringCall(
+            "DescribeTopics-Recurring",
+            calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+            runnable
+        ) {
+
+            Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+            Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+            String partiallyFinishedTopicName = "";
+            int partiallyFinishedTopicNextPartitionId = -1;
+
+            @Override
+            Call generateCall() {
+                return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+                    @Override
+                    DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                        DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                            
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+                        if (!partiallyFinishedTopicName.isEmpty()) {
+                            request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                                .setTopicName(partiallyFinishedTopicName)
+                                
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+                            );
+                        }
+
+                        for (int ii = pendingTopics.size(); ii < 
options.partitionSizeLimitPerResponse() && pendingTopicIterator.hasNext(); 
++ii) {
+                            String topicName = pendingTopicIterator.next();
+                            pendingTopics.put(topicName, new 
TopicRequest().setName(topicName));
+                        }
+                        
request.setTopics(pendingTopics.values().stream().collect(Collectors.toList()));

Review Comment:
   This is a hint more concise, but arguably just preference:
   
   ```suggestion
                           request.setTopics(new 
ArrayList<>pendingTopics.values());
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {

Review Comment:
   ```suggestion
       public void describeTopics(
           TopicNameCollection topics,
           DescribeTopicsOptions options,
           AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
   ```
   
   This makes more sense in that `TopicNameCollection` is really all we support 
for this release. We could later generalize the API method signature to accept 
`TopicIdCollection` once support is added, and it shouldn't break any existing 
usage.



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+        if (topics instanceof TopicIdCollection) {
+            subscriber.onError(
+                new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+            );
+            return;
+        }
+        if (!(topics instanceof TopicNameCollection)) {
+            subscriber.onError(
+                new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+            );
+            return;
+        }
+
+        TreeSet<String> topicNames = new TreeSet<>();
+        ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+            if (topicNameIsUnrepresentable(topicName)) {
+                
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request.")));
+            } else {
+                topicNames.add(topicName);
+            }
+        });
+
+        RecurringCall call = new RecurringCall(
+            "DescribeTopics-Recurring",
+            calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+            runnable
+        ) {
+
+            Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+            Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+            String partiallyFinishedTopicName = "";
+            int partiallyFinishedTopicNextPartitionId = -1;

Review Comment:
   Could we replace this with a DescribeTopicPartitionsRequestData.Cursor 
instead of the two separate variables? It makes its purpose a bit more clear 
and we don't have to be worried about them getting out of sync.
   
   ```suggestion
               DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+        if (topics instanceof TopicIdCollection) {
+            subscriber.onError(
+                new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+            );
+            return;
+        }
+        if (!(topics instanceof TopicNameCollection)) {
+            subscriber.onError(
+                new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+            );
+            return;
+        }
+
+        TreeSet<String> topicNames = new TreeSet<>();
+        ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+            if (topicNameIsUnrepresentable(topicName)) {
+                
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request.")));
+            } else {
+                topicNames.add(topicName);
+            }
+        });
+
+        RecurringCall call = new RecurringCall(
+            "DescribeTopics-Recurring",
+            calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+            runnable
+        ) {
+
+            Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+            Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+            String partiallyFinishedTopicName = "";
+            int partiallyFinishedTopicNextPartitionId = -1;
+
+            @Override
+            Call generateCall() {
+                return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+                    @Override
+                    DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                        DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                            
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+                        if (!partiallyFinishedTopicName.isEmpty()) {
+                            request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                                .setTopicName(partiallyFinishedTopicName)
+                                
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+                            );

Review Comment:
   ```suggestion
                               request.setCursor(requestCursor);
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2108,9 +2147,9 @@ void handleFailure(Throwable throwable) {
     public DescribeTopicsResult describeTopics(final TopicCollection topics, 
DescribeTopicsOptions options) {
         if (topics instanceof TopicIdCollection)
             return 
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) 
topics).topicIds(), options));
-        else if (topics instanceof TopicNameCollection)
+        else if (topics instanceof TopicNameCollection) {
             return 
DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection)
 topics).topicNames(), options));
-        else
+        } else

Review Comment:
   Does including the brackets help here?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+        if (topics instanceof TopicIdCollection) {
+            subscriber.onError(
+                new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+            );
+            return;
+        }
+        if (!(topics instanceof TopicNameCollection)) {
+            subscriber.onError(
+                new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+            );
+            return;
+        }
+
+        TreeSet<String> topicNames = new TreeSet<>();
+        ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+            if (topicNameIsUnrepresentable(topicName)) {
+                
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request.")));
+            } else {
+                topicNames.add(topicName);
+            }
+        });
+
+        RecurringCall call = new RecurringCall(
+            "DescribeTopics-Recurring",
+            calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+            runnable
+        ) {
+
+            Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+            Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+            String partiallyFinishedTopicName = "";
+            int partiallyFinishedTopicNextPartitionId = -1;
+
+            @Override
+            Call generateCall() {
+                return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+                    @Override
+                    DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                        DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                            
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+                        if (!partiallyFinishedTopicName.isEmpty()) {
+                            request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                                .setTopicName(partiallyFinishedTopicName)
+                                
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+                            );
+                        }
+
+                        for (int ii = pendingTopics.size(); ii < 
options.partitionSizeLimitPerResponse() && pendingTopicIterator.hasNext(); 
++ii) {
+                            String topicName = pendingTopicIterator.next();
+                            pendingTopics.put(topicName, new 
TopicRequest().setName(topicName));
+                        }
+                        
request.setTopics(pendingTopics.values().stream().collect(Collectors.toList()));
+
+                        return new 
DescribeTopicPartitionsRequest.Builder(request);
+                    }
+
+                    @Override
+                    void handleResponse(AbstractResponse abstractResponse) {
+                        DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                        String cursorTopicName = "";
+                        int cursorPartitionId = -1;

Review Comment:
   Same idea as above, but with the response cursor:
   
   ```suggestion
                           DescribeTopicPartitionsResponseData.Cursor 
responseCursor = null;
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+        if (topics instanceof TopicIdCollection) {
+            subscriber.onError(
+                new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+            );
+            return;
+        }
+        if (!(topics instanceof TopicNameCollection)) {
+            subscriber.onError(
+                new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+            );
+            return;
+        }
+
+        TreeSet<String> topicNames = new TreeSet<>();
+        ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+            if (topicNameIsUnrepresentable(topicName)) {
+                
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request.")));
+            } else {
+                topicNames.add(topicName);
+            }
+        });
+
+        RecurringCall call = new RecurringCall(
+            "DescribeTopics-Recurring",
+            calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+            runnable
+        ) {
+
+            Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+            Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+            String partiallyFinishedTopicName = "";
+            int partiallyFinishedTopicNextPartitionId = -1;
+
+            @Override
+            Call generateCall() {
+                return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+                    @Override
+                    DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                        DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                            
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+                        if (!partiallyFinishedTopicName.isEmpty()) {
+                            request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                                .setTopicName(partiallyFinishedTopicName)
+                                
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+                            );
+                        }
+
+                        for (int ii = pendingTopics.size(); ii < 
options.partitionSizeLimitPerResponse() && pendingTopicIterator.hasNext(); 
++ii) {
+                            String topicName = pendingTopicIterator.next();
+                            pendingTopics.put(topicName, new 
TopicRequest().setName(topicName));
+                        }
+                        
request.setTopics(pendingTopics.values().stream().collect(Collectors.toList()));
+
+                        return new 
DescribeTopicPartitionsRequest.Builder(request);
+                    }
+
+                    @Override
+                    void handleResponse(AbstractResponse abstractResponse) {
+                        DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                        String cursorTopicName = "";
+                        int cursorPartitionId = -1;
+                        if (response.data().nextCursor() != null) {
+                            DescribeTopicPartitionsResponseData.Cursor cursor 
= response.data().nextCursor();
+                            cursorTopicName = cursor.topicName();
+                            cursorPartitionId = cursor.partitionIndex();
+                        }
+
+                        for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+                            String topicName = topic.name();
+                            Errors error = Errors.forCode(topic.errorCode());
+
+                            if (error != Errors.NONE) {
+                                
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, 
error.exception()));
+                                if (cursorTopicName.equals(topicName)) {
+                                    cursorTopicName = "";
+                                    cursorPartitionId = -1;

Review Comment:
   ```suggestion
                                       responseCursor = null;
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
         return partitionInfo.leader();
     }
 
+    @Override
+    public void describeTopics(
+        TopicCollection topics,
+        DescribeTopicsOptions options,
+        AdminResultsSubscriber<DescribeTopicPartitionsResult> subscriber) {
+        if (topics instanceof TopicIdCollection) {
+            subscriber.onError(
+                new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+            );
+            return;
+        }
+        if (!(topics instanceof TopicNameCollection)) {
+            subscriber.onError(
+                new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+            );
+            return;
+        }
+
+        TreeSet<String> topicNames = new TreeSet<>();
+        ((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+            if (topicNameIsUnrepresentable(topicName)) {
+                
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+                    topicName + "' cannot be represented in a request.")));
+            } else {
+                topicNames.add(topicName);
+            }
+        });
+
+        RecurringCall call = new RecurringCall(
+            "DescribeTopics-Recurring",
+            calcDeadlineMs(time.milliseconds(), options.timeoutMs()),
+            runnable
+        ) {
+
+            Map<String, TopicRequest> pendingTopics = new TreeMap<>();
+            Iterator<String> pendingTopicIterator = topicNames.iterator();
+
+            String partiallyFinishedTopicName = "";
+            int partiallyFinishedTopicNextPartitionId = -1;
+
+            @Override
+            Call generateCall() {
+                return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+                    @Override
+                    DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                        DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                            
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+
+                        if (!partiallyFinishedTopicName.isEmpty()) {
+                            request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                                .setTopicName(partiallyFinishedTopicName)
+                                
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+                            );
+                        }
+
+                        for (int ii = pendingTopics.size(); ii < 
options.partitionSizeLimitPerResponse() && pendingTopicIterator.hasNext(); 
++ii) {
+                            String topicName = pendingTopicIterator.next();
+                            pendingTopics.put(topicName, new 
TopicRequest().setName(topicName));
+                        }
+                        
request.setTopics(pendingTopics.values().stream().collect(Collectors.toList()));
+
+                        return new 
DescribeTopicPartitionsRequest.Builder(request);
+                    }
+
+                    @Override
+                    void handleResponse(AbstractResponse abstractResponse) {
+                        DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                        String cursorTopicName = "";
+                        int cursorPartitionId = -1;
+                        if (response.data().nextCursor() != null) {
+                            DescribeTopicPartitionsResponseData.Cursor cursor 
= response.data().nextCursor();
+                            cursorTopicName = cursor.topicName();
+                            cursorPartitionId = cursor.partitionIndex();

Review Comment:
   ```suggestion
                               responseCursor = response.data().nextCursor();
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -994,6 +1002,37 @@ public boolean isInternal() {
         }
     }
 
+    abstract class RecurringCall {
+        private final String name;
+        final long deadlineMs;
+        private final AdminClientRunnable runnable;

Review Comment:
   Does `runnable` need to be passed in? Can't `runnable` be accessed directly 
since `RecurringCall` is a non-static inner class of `KafkaAdminClient`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to