kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1503175975


##########
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##########
@@ -47,8 +49,25 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
         return this;
     }
 
+    public DescribeTopicsOptions useDescribeTopicsApi(boolean 
useDescribeTopicsApi) {
+        this.useDescribeTopicsApi = useDescribeTopicsApi;
+        return this;
+    }

Review Comment:
   Can we add some comments here for a developer to know _why_ to use the 
topics API or not?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -994,6 +1002,36 @@ public boolean isInternal() {
         }
     }
 
+    abstract class RecurringCall {
+        private final String name;
+        final long deadlineMs;
+        private final AdminClientRunnable runnable;
+        KafkaFutureImpl<Boolean> nextRun;
+        abstract Call generateCall();
+
+        public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+            this.name = name;
+            this.deadlineMs = deadlineMs;
+            this.runnable = runnable;
+        }
+
+        public String toString() {
+            return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + 
")";

Review Comment:
   ```suggestion
               return "RecurringCall(name=" + name + ", deadlineMs=" + 
deadlineMs + ")";
   ```



##########
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##########
@@ -79,9 +95,24 @@ public List<Node> isr() {
         return isr;
     }
 
+    /**
+     * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+     */
+    public List<Node> elr() {
+        return elr;
+    }
+
+    /**
+     * Return the last known eligible leader replicas of the partition. Note 
that the ordering of the result is unspecified.
+     */
+    public List<Node> lastKnownElr() {
+        return lastKnownElr;
+    }
+
     public String toString() {
         return "(partition=" + partition + ", leader=" + leader + ", 
replicas=" +
-            Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + 
")";
+            Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
+            Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")";

Review Comment:
   ```suggestion
               Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + 
", elr=" +
               Utils.join(elr, ", ") + ", lastKnownElr=" + 
Utils.join(lastKnownElr, ", ") + ")";
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -994,6 +1002,36 @@ public boolean isInternal() {
         }
     }
 
+    abstract class RecurringCall {
+        private final String name;
+        final long deadlineMs;
+        private final AdminClientRunnable runnable;
+        KafkaFutureImpl<Boolean> nextRun;
+        abstract Call generateCall();
+
+        public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+            this.name = name;
+            this.deadlineMs = deadlineMs;
+            this.runnable = runnable;
+        }
+
+        public String toString() {
+            return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + 
")";
+        }
+
+        public void run() {
+            try {
+                do {
+                    nextRun = new KafkaFutureImpl<>();
+                    Call call = generateCall();
+                    runnable.call(call, time.milliseconds());
+                } while (nextRun.get());
+            } catch (Exception e) {
+                log.info("Stop the recurring call " + name + " because " + e);

Review Comment:
   Are we specifically wanting to avoid outputting a stack trace?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) {
     public DescribeTopicsResult describeTopics(final TopicCollection topics, 
DescribeTopicsOptions options) {
         if (topics instanceof TopicIdCollection)
             return 
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) 
topics).topicIds(), options));
-        else if (topics instanceof TopicNameCollection)
+        else if (topics instanceof TopicNameCollection) {
+            if (options.useDescribeTopicsApi()) {
+                return DescribeTopicsResult.ofTopicNameIterator(new 
DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), 
options));

Review Comment:
   It's been my experience that it's "dangerous" 😨 to run arbitrary user code 
from within the context of the client code. User code can (and will) do 
unpredictable things with state, errors, threads, etc. The surrounding code 
inside the client has to be very careful to make sure it handles many different 
cases.
   
   That said, I like the ergonomics of the `Consumer`-based approach over the 
iterator.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to