kirktrue commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1503175975
########## clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java: ########## @@ -47,8 +49,25 @@ public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthoriz return this; } + public DescribeTopicsOptions useDescribeTopicsApi(boolean useDescribeTopicsApi) { + this.useDescribeTopicsApi = useDescribeTopicsApi; + return this; + } Review Comment: Can we add some comments here for a developer to know _why_ to use the topics API or not? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -994,6 +1002,36 @@ public boolean isInternal() { } } + abstract class RecurringCall { + private final String name; + final long deadlineMs; + private final AdminClientRunnable runnable; + KafkaFutureImpl<Boolean> nextRun; + abstract Call generateCall(); + + public RecurringCall(String name, long deadlineMs, AdminClientRunnable runnable) { + this.name = name; + this.deadlineMs = deadlineMs; + this.runnable = runnable; + } + + public String toString() { + return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; Review Comment: ```suggestion return "RecurringCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; ``` ########## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ########## @@ -79,9 +95,24 @@ public List<Node> isr() { return isr; } + /** + * Return the eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ + public List<Node> elr() { + return elr; + } + + /** + * Return the last known eligible leader replicas of the partition. Note that the ordering of the result is unspecified. + */ + public List<Node> lastKnownElr() { + return lastKnownElr; + } + public String toString() { return "(partition=" + partition + ", leader=" + leader + ", replicas=" + - Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")"; + Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + + Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")"; Review Comment: ```suggestion Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ", elr=" + Utils.join(elr, ", ") + ", lastKnownElr=" + Utils.join(lastKnownElr, ", ") + ")"; ``` ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -994,6 +1002,36 @@ public boolean isInternal() { } } + abstract class RecurringCall { + private final String name; + final long deadlineMs; + private final AdminClientRunnable runnable; + KafkaFutureImpl<Boolean> nextRun; + abstract Call generateCall(); + + public RecurringCall(String name, long deadlineMs, AdminClientRunnable runnable) { + this.name = name; + this.deadlineMs = deadlineMs; + this.runnable = runnable; + } + + public String toString() { + return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + ")"; + } + + public void run() { + try { + do { + nextRun = new KafkaFutureImpl<>(); + Call call = generateCall(); + runnable.call(call, time.milliseconds()); + } while (nextRun.get()); + } catch (Exception e) { + log.info("Stop the recurring call " + name + " because " + e); Review Comment: Are we specifically wanting to avoid outputting a stack trace? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) { public DescribeTopicsResult describeTopics(final TopicCollection topics, DescribeTopicsOptions options) { if (topics instanceof TopicIdCollection) return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options)); - else if (topics instanceof TopicNameCollection) + else if (topics instanceof TopicNameCollection) { + if (options.useDescribeTopicsApi()) { + return DescribeTopicsResult.ofTopicNameIterator(new DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), options)); Review Comment: It's been my experience that it's "dangerous" 😨 to run arbitrary user code from within the context of the client code. User code can (and will) do unpredictable things with state, errors, threads, etc. The surrounding code inside the client has to be very careful to make sure it handles many different cases. That said, I like the ergonomics of the `Consumer`-based approach over the iterator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org