Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-05-29 Thread via GitHub


github-actions[bot] commented on PR #15265:
URL: https://github.com/apache/kafka/pull/15265#issuecomment-2138611194

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-04-18 Thread via GitHub


mumrah merged PR #15470:
URL: https://github.com/apache/kafka/pull/15470


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-04-05 Thread via GitHub


CalvinConfluent commented on PR #15470:
URL: https://github.com/apache/kafka/pull/15470#issuecomment-2040217099

   @mumrah Thanks for the review. Ticket filed.
   https://issues.apache.org/jira/browse/KAFKA-15579


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-04-04 Thread via GitHub


mumrah commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1551680241


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2185,157 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+}
+
+Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi(
+List topicNamesList,
+Map> topicFutures,
+Map nodes,
+DescribeTopicsOptions options,
+long now
+) {
+Map topicsRequests = new LinkedHashMap<>();
+topicNamesList.stream().sorted().forEach(topic -> {
+topicsRequests.put(topic, new TopicRequest().setName(topic));
+});
+return new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics = topicsRequests;

Review Comment:
   Why do we need this inner assignment? Can we make `topicsRequest` final in 
the outer scope?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.

Review Comment:
   @CalvinConfluent can you open a ticket for this so we don't forget about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-29 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1544749533


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2184,155 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+}
+
+Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi(
+List topicNamesList,
+Map> topicFutures,
+Map nodes,
+DescribeTopicsOptions options,
+long now
+) {
+return new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));

Review Comment:
   Thanks for the advice. Addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-29 Thread via GitHub


mumrah commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1544478846


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2184,155 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+}
+
+Call generateDescribeTopicsCallWithDescribeTopicPartitionsApi(
+List topicNamesList,
+Map> topicFutures,
+Map nodes,
+DescribeTopicsOptions options,
+long now
+) {
+return new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));

Review Comment:
   I think this would be more readable if we just create a TreeMap and populate 
it in a for loop. Since we are using this map like a queue, it might be better 
to use LinkedHashMap (after sorting the topics). Really, we don't even need a 
map since we can build the TopicRequest-s on demand in createRequest. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-15 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526696015


##
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##
@@ -639,6 +639,25 @@ public void testDescribe(String quorum) throws 
ExecutionException, InterruptedEx
 assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + 
rows[0]);
 }
 
+@ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+@ValueSource(strings = {"quorum=zk", "quorum=kraft"})
+public void testDescribeWithDescribeTopicPartitionsApi(String quorum) 
throws ExecutionException, InterruptedException {
+TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 3, (short) 2,
+scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
+);
+String secondTopicName = "test-2";
+TestUtils.createTopicWithAdmin(adminClient, secondTopicName, 
scalaBrokers, scalaControllers, 3, (short) 2,
+scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
+);
+
+String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap(
+"--describe", "--partition-size-limit-per-response=1"));
+String[] rows = output.split("\n");
+assertEquals(8, rows.length, String.join("\n", rows));
+assertTrue(rows[2].contains("\tElr"), rows[2]);
+assertTrue(rows[2].contains("LastKnownElr"), rows[2]);
+}
+
 @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)

Review Comment:
   Thanks for the advice. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-15 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526695825


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -799,6 +820,11 @@ public TopicCommandOptions(String[] args) {
 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
 excludeInternalTopicOpt = parser.accepts("exclude-internal",
 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+partitionSizeLimitPerResponseOpt = 
parser.accepts("partition-size-limit-per-response",
+"the maximum partition size to be included in one 
DescribeTopicPartitions response. Only valid if use-describe-topics-api is 
used")

Review Comment:
   Good catch. removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-15 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526695578


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+// If the previous cursor points to the partition 0, the 
cursor will not be set as the first one
+// in the topic list should be the previous cursor topic.
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+// The topicDescription for the cursor topic of the current 
batch.
+TopicDescription nextTopicDescription = null;
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-15 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526690281


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;

Review Comment:
   The current plan is to try new API first and if the cluster does not support 
the API, it switches to the MetadataAPI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-15 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526685631


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2256,6 +2384,26 @@ void handleFailure(Throwable throwable) {
 return new HashMap<>(topicFutures);
 }
 
+private TopicDescription 
getTopicDescriptionFromDescribeTopicsResponseTopic(
+DescribeTopicPartitionsResponseTopic topic,
+Map nodes
+) {
+List partitionInfos = 
topic.partitions();
+List partitions = new 
ArrayList<>(partitionInfos.size());
+for (DescribeTopicPartitionsResponsePartition partitionInfo : 
partitionInfos) {
+TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+partitionInfo.partitionIndex(),
+nodes.get(partitionInfo.leaderId()),
+partitionInfo.replicaNodes().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+partitionInfo.isrNodes().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+partitionInfo.eligibleLeaderReplicas().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+partitionInfo.lastKnownElr().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()));
+partitions.add(topicPartitionInfo);
+}
+
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));

Review Comment:
   Good catch, removed the sorting. The server already guarantees the 
partitioning order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-15 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526680281


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})

Review Comment:
   The handlerResponse still reports NPathComplexity after splitting the 
method, but it is not worth splitting that method further. Only apply the 
NPathComplexity suppress there.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2256,6 +2384,26 @@ void handleFailure(Throwable throwable) {
 return new HashMap<>(topicFutures);
 }
 
+private TopicDescription 
getTopicDescriptionFromDescribeTopicsResponseTopic(
+DescribeTopicPartitionsResponseTopic topic,
+Map nodes
+) {
+List partitionInfos = 
topic.partitions();
+List partitions = new 
ArrayList<>(partitionInfos.size());
+for (DescribeTopicPartitionsResponsePartition partitionInfo : 
partitionInfos) {
+TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+partitionInfo.partitionIndex(),
+nodes.get(partitionInfo.leaderId()),
+partitionInfo.replicaNodes().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+partitionInfo.isrNodes().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+partitionInfo.eligibleLeaderReplicas().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+partitionInfo.lastKnownElr().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()));

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-15 Thread via GitHub


mumrah commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526390858


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})

Review Comment:
   Instead of suppressing these warnings, can we refactor the method to be less 
complex? For example, the Call instance could be defined elsewhere



##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -40,12 +42,26 @@ public class TopicPartitionInfo {
  * @param replicas the replicas of the partition in the same order as the 
replica assignment (the preferred replica
  * is the head of the list)
  * @param isr the in-sync replicas
+ * @param elr the eligible leader replicas
+ * @param lastKnownElr the last known eligible leader replicas.
  */
+public TopicPartitionInfo(int partition, Node leader, List replicas, 
List isr,
+  List elr, List lastKnownElr) {

Review Comment:
   nit: use this style if you need more than one line
   ```
   Foo(
 int x,
 int y
   ) {
   ```



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;

Review Comment:
   Why did we remove this optimization? Seems like we should try to keep the 
Metadata API code unchanged if possible



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+// If the previous cursor points to the partition 0, the 
cursor will not be 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-13 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1523996339


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2185,9 +2179,144 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+// If the previous cursor points to the partition 0, the 
cursor will not be set as the first one
+// in the topic list should be the previous cursor topic.
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+// The topicDescription for the cursor topic of the current 
batch.
+TopicDescription nextTopicDescription = null;
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-13 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1523971792


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {

Review Comment:
   Correct, updated the comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-13 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1523702932


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {

Review Comment:
   Ideally we should drive the pagination from the cursor, rather than 
spreading the logic between multiple states.  We plan to get rid of topic list 
eventually and this logic would be easy to miss.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+continue;
+}
+
+  

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520498085


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {

Review Comment:
   Because we provide the topic list and the list only contains the topic we 
have not received the result, we can skip setting the partition 0 topics in the 
cursor. Such topics will be the first one in the Topics field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-11 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520495884


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+continue;
+}
+
+  

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-11 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1520443872


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,139 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings({"MethodLength", "NPathComplexity"})
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
 }
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e.getCause());
+return new HashMap<>(topicFutures);
+}
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+TopicDescription partiallyFinishedTopicDescription = null;
+TopicDescription nextTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (partiallyFinishedTopicDescription != null) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicDescription.name())
+
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+);
+}
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+continue;
+}
+
+

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-07 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1516873731


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e);
+return new HashMap<>(topicFutures);
 }
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (requestCursor != null && 
requestCursor.topicName().equals(topicName)) {

Review Comment:
   But the user may not have specified the topic list at all, is it a good 
experience to show them an error if a topic they didn't specify got deleted?



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515290353


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e);
+return new HashMap<>(topicFutures);
 }
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (requestCursor != null && 
requestCursor.topicName().equals(topicName)) {

Review Comment:
   Because the current implementation sends the topic list with the cursor, if 
the cursor topic is missing from the topic list, the request will fail with the 
invalid request error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515289272


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+
+if (topicNamesList.isEmpty()) {
+return new HashMap<>(topicFutures);
+}
+
+// First, we need to retrieve the node info.
+DescribeClusterResult clusterResult = describeCluster();
+Map nodes;
+try {
+nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+} catch (InterruptedException | ExecutionException e) {
+completeAllExceptionally(topicFutures.values(), e);
+return new HashMap<>(topicFutures);
 }
+
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);
+pendingTopics.remove(topicName);
+if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+responseCursor = null;
+}
+continue;
+}
+
+TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+if (requestCursor != null && 
requestCursor.topicName().equals(topicName)) {
+if (partiallyFinishedTopicDescription == null) {
+// The previous round cursor can point to the 
partition 0 of the next topic.
+partiallyFinishedTopicDescription = 
currentTopicDescription;
+} else {
+
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+}
+
+if (responseCursor == null 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515288246


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2177,122 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);

Review Comment:
   Yes, we don't need to remove it. 
   Also added a failure handling test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


artemlivshits commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513820234


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2177,122 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+new LeastLoadedNodeProvider()) {
+Map pendingTopics =
+topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+DescribeTopicPartitionsRequestData.Cursor requestCursor = null;
+TopicDescription partiallyFinishedTopicDescription = null;
+
+@Override
+DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+.setTopics(new ArrayList<>(pendingTopics.values()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+request.setCursor(requestCursor);
+return new DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) {
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+future.completeExceptionally(error.exception());
+topicFutures.remove(topicName);

Review Comment:
   This will be executed concurrently with the thread that called 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi, so we may 
concurrently modify topic futures while the HashMap constructor at line 2313 is 
iterating over it and it's not a thread-safe collection.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2184,9 +2178,138 @@ void handleFailure(Throwable throwable) {
 completeAllExceptionally(topicFutures.values(), throwable);
 }
 };
-if (!topicNamesList.isEmpty()) {
-runnable.call(call, now);
+return call;
+}
+
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+final Collection topicNames,
+DescribeTopicsOptions options
+) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+
+if (topicNamesList.isEmpty()) {
+return new 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515057898


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@SuppressWarnings("NPathComplexity")

Review Comment:
   Updated the server-side code and added more tests with invalid cursor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1515057124


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using DescribeTopics API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);
+}

Review Comment:
   Now it will query the cluster info for the nodes.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2399,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using 
DescribeTopicPartitions API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);

Review Comment:
   Good point. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


AndrewJSchofield commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1514119801


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using DescribeTopics API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);
+}

Review Comment:
   Maybe you should create a new constructor for TopicPartitionInfo that 
doesn't require a Node. This faking is a bit ugly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


AndrewJSchofield commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1514117940


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@SuppressWarnings("NPathComplexity")

Review Comment:
   The client sends one. There will be multiple client implementations over 
time, so it's prudent to ensure that a malformed cursor is caught properly, 
just in case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-06 Thread via GitHub


dajac commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1514092183


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2399,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using 
DescribeTopicPartitions API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);

Review Comment:
   Don't we break the contract of the admin api if we do this? The expectation 
is that the real nodes are returned. We could perhaps get them from the 
metadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513643851


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.

Review Comment:
   Right now I don't think this config is useful because we are not doing the 
client-side pagination. The config only makes sense if one batch of partitions 
is large enough to cause client-side OOM.
   Maybe we should add this config in the future? What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513646133


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using DescribeTopics API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);
+}

Review Comment:
   The DescribeTopicParitions does not provide the node info as Metadata Api 
does. However the TopicPartitionInfo constructor requires the node info, but 
the node info is useless in the describeTopic scenario.



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -537,6 +544,18 @@ public Map 
listAllReassignments(Set

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513643851


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.

Review Comment:
   The client can only know if the server-side limit is greater when the result 
is received. 
   Actually, I think this config is only useful for testing. I am not sure 
whether any user will bother to change this config. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513641333


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) {
 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
 excludeInternalTopicOpt = parser.accepts("exclude-internal",
 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+partitionSizeLimitPerResponseOpt = 
parser.accepts("partition-size-limit-per-response",

Review Comment:
   user-describe-topics-api is removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513641009


##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@SuppressWarnings("NPathComplexity")

Review Comment:
   Do you mean the server returns an invalid cursor or the client sends one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640718


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) {
 return new HashMap<>(topicFutures);
 }
 
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640577


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.
+public DescribeTopicsOptions partitionSizeLimitPerResponse(int 
partitionSizeLimitPerResponse) {
+this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse;
+return this;
+}
+
 public boolean includeAuthorizedOperations() {
 return includeAuthorizedOperations;
 }
 
+public boolean useDescribeTopicPartitionsApi() {

Review Comment:
   removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


CalvinConfluent commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513640434


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   Removed useDescribeTopicPartitionsApi. It was only used in the 
UnsupportedVersionException retry, but now I figured out a way to retry in the 
Call framework.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


kirktrue commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513390603


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.

Review Comment:
   Is there a warning logged in the case where the client sends a limit greater 
than what's allowed on the broker?



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -537,6 +544,18 @@ public Map 
listAllReassignments(Set(topicFutures);
 }
 
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),

Review Comment:
   AFAICT, the `callName` used within the `Call` object is only used for 
logging. That said, there's no point in confusing the user who's looking 
through the logs.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2415,11 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+// This is used in the describe topics path if using DescribeTopics API.
+private Node replicaToFakeNode(int id) {
+return new Node(id, "Dummy", 0);
+}

Review Comment:
   Just for my own understanding, why do we favor creating _fake_ nodes instead 
of looking up the _real_ nodes from the metadata or something?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   Sorry for being daft, but when would the user know to set this one way or 
the other. Is this something that can be handled under the covers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


dajac commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513322953


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   Shouldn't the selection be automatic? I don't think users will bother about 
this. Basically, the new API should be used when available and the old one when 
not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


dajac commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513322953


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   Shouldn't the selection be automatic? I don't think uses will bother about 
this. Basically, the new API should be used when available and the old one when 
not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-05 Thread via GitHub


AndrewJSchofield commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1513310569


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2190,6 +2201,117 @@ void handleFailure(Throwable throwable) {
 return new HashMap<>(topicFutures);
 }
 
+@SuppressWarnings("MethodLength")
+private Map> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(final 
Collection topicNames, DescribeTopicsOptions options) {
+final Map> topicFutures = 
new HashMap<>(topicNames.size());
+final ArrayList topicNamesList = new ArrayList<>();
+for (String topicName : topicNames) {
+if (topicNameIsUnrepresentable(topicName)) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+topicName + "' cannot be represented in a request."));
+topicFutures.put(topicName, future);
+} else if (!topicFutures.containsKey(topicName)) {
+topicFutures.put(topicName, new KafkaFutureImpl<>());
+topicNamesList.add(topicName);
+}
+}
+final long now = time.milliseconds();
+Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),

Review Comment:
   Should this be "describeTopicPartitions"?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {

Review Comment:
   I think you need a small change to KIP-966 to document these changes to the 
admin API.



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -799,6 +823,11 @@ public TopicCommandOptions(String[] args) {
 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
 excludeInternalTopicOpt = parser.accepts("exclude-internal",
 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+partitionSizeLimitPerResponseOpt = 
parser.accepts("partition-size-limit-per-response",

Review Comment:
   In other cases where a new API has been introduced, I think the principle 
followed is to try the new one without an option, and falling back if it is 
detected that it's required. That would be much nicer than expecting the 
innocent user from understanding what `user-describe-topics-api` means.



##
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##
@@ -1399,6 +1404,184 @@ public void testInvalidTopicNames() throws Exception {
 }
 }
 
+@SuppressWarnings("NPathComplexity")

Review Comment:
   Could we have a test with an invalid cursor?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,32 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+/**
+ * Whether to use the DescribeTopicPartitions API. It should be set to 
false if DescribeTopicPartitions API is
+ * not supported.
+ *
+ */
+public DescribeTopicsOptions useDescribeTopicPartitionsApi(boolean 
useDescribeTopicPartitionsApi) {
+this.useDescribeTopicPartitionsApi = useDescribeTopicPartitionsApi;
+return this;
+}
+
+// Note that, partitionSizeLimitPerResponse will not be effective if it is 
larger than the config
+// max.request.partition.size.limit on the server side.
+public DescribeTopicsOptions partitionSizeLimitPerResponse(int 
partitionSizeLimitPerResponse) {
+this.partitionSizeLimitPerResponse = partitionSizeLimitPerResponse;
+return this;
+}
+
 public boolean includeAuthorizedOperations() {
 return includeAuthorizedOperations;
 }
 
+public boolean useDescribeTopicPartitionsApi() {

Review Comment:
   I suggest just `useDescribeTopicPartitions()`. In the Javadoc, you can 
mention that it's using the DescribeTopicPartitions API under the covers. Most 
users of the Kafka admin client would consider `KafkaAdminClient` to be the 
API, rather than the Kafka protocol which is what is meant here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-04 Thread via GitHub


CalvinConfluent commented on PR #15470:
URL: https://github.com/apache/kafka/pull/15470#issuecomment-1977980848

   As discussed offline, we decided not to implement the pagination handling in 
this ticket. Here is the load-all-in-memory version to the 
https://github.com/apache/kafka/pull/15265
   @artemlivshits @mumrah @kirktrue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15585: DescribeTopicPartitions client side change. [kafka]

2024-03-04 Thread via GitHub


CalvinConfluent opened a new pull request, #15470:
URL: https://github.com/apache/kafka/pull/15470

   https://issues.apache.org/jira/browse/KAFKA-15585
   Add the support for DescribeTopicPartitions API on the client side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-29 Thread via GitHub


artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1506982605


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2276,6 +2337,141 @@ private Node leader(PartitionInfo partitionInfo) {
 return partitionInfo.leader();
 }
 
+@Override
+public void describeTopics(
+TopicCollection topics,
+DescribeTopicsOptions options,
+AdminResultsSubscriber subscriber) {
+if (topics instanceof TopicIdCollection) {
+subscriber.onError(
+new IllegalArgumentException("Currently the describeTopics 
subscription mode does not support topic IDs.")
+);
+return;
+}
+if (!(topics instanceof TopicNameCollection)) {
+subscriber.onError(
+new IllegalArgumentException("The TopicCollection: " + topics 
+ " provided did not match any supported classes for describeTopics.")
+);
+return;
+}
+
+TreeSet topicNames = new TreeSet<>();
+((TopicNameCollection) topics).topicNames().forEach(topicName -> {
+if (topicNameIsUnrepresentable(topicName)) {
+
subscriber.onNext(DescribeTopicPartitionsResult.onError(topicName, new 
InvalidTopicException("The given topic name '" +
+topicName + "' cannot be represented in a request.")));
+} else {
+topicNames.add(topicName);
+}
+});
+
+RecurringCall call = new RecurringCall(

Review Comment:
   I'm still not sure why we need the RecurringCall, I think something like 
this should be much less code:
   
   ```
 ArrayBlockingQueue results = new 
ArrayBlockingQueue<>(5);
 Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()), new LeastLoadedNodeProvider()) {
   DescribeTopicPartitionsRequestData.Cursor currentCursor = new 
DescribeTopicPartitionsRequestData.Cursor();
   // <...>
   @Override
   void handleResponse(AbstractResponse abstractResponse) {
 // ... Do the needful ...
 results.put(...);
 // ...
  if (hasMore) {
// ... Set new cursor ...
// ...
runnable.call(this, now);
  } else {
results.put(null);
  }
   }
   // <...>
 }
   
 runnable.call(call, time.milliseconds());
 while (true) {
   DescribeTopicPartitionsResult result = results.take();
   if (result == null)
 break;
   subscriber.onNext(result);
 }
   ```
   
   And we won't need to create extra threads on the TopicCommand and etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-28 Thread via GitHub


kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1506743625


##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -79,9 +95,24 @@ public List isr() {
 return isr;
 }
 
+/**
+ * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+ */
+public List eligibleLeaderReplicas() {
+return elr;
+}
+
+/**
+ * Return the last known eligible leader replicas of the partition. Note 
that the ordering of the result is unspecified.
+ */
+public List lastKnownEligibleLeaderReplicas() {
+return lastKnownElr;
+}

Review Comment:
   Sorry, @CalvinConfluent, I've changed my mind 樂 
   
   Using _ELR_ as an established acronym isn't too bad if one has read the KIP. 
I'd be happy with using `elr()` and `lastKnownElr()` as you originally had it.
   
   I would suggest if you do revert it, just explicitly call it out in the 
JavaDoc comments.
   
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-28 Thread via GitHub


kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1506629098


##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -335,6 +335,21 @@ default DescribeTopicsResult 
describeTopics(TopicCollection topics) {
  */
 DescribeTopicsResult describeTopics(TopicCollection topics, 
DescribeTopicsOptions options);
 
+/**
+ * Describe some topics in the cluster.
+ *
+ * When using topic IDs, this operation is supported by brokers with 
version 3.1.0 or higher.
+ *
+ * @param topics  The topics to describe.
+ * @param options The options to use when describing the topics.
+ * @param subscriber The subscriber to consumer the results.
+ */
+default void describeTopics(
+TopicCollection topics,
+DescribeTopicsOptions options,
+AdminResultsSubscriber subscriber) {
+};

Review Comment:
   Just so I understand, is the default of a no-op intentional?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -30,6 +30,7 @@
 public class DescribeTopicsOptions extends 
AbstractOptions {
 
 private boolean includeAuthorizedOperations;
+private int partitionSizeLimitPerResponse;

Review Comment:
   Is there an upper limit (besides Integer.MAX_SIZE )?



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -367,6 +372,14 @@ public void printDescription() {
 .map(node -> node.toString())
 .collect(Collectors.joining(",")));
 }
+
+System.out.print("\tElr: " + info.eligibleLeaderReplicas().stream()

Review Comment:
   ```suggestion
   System.out.print("\tELRs: " + 
info.eligibleLeaderReplicas().stream()
   ```



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -367,6 +372,14 @@ public void printDescription() {
 .map(node -> node.toString())
 .collect(Collectors.joining(",")));
 }
+
+System.out.print("\tElr: " + info.eligibleLeaderReplicas().stream()
+.map(node -> Integer.toString(node.id()))
+.collect(Collectors.joining(",")));
+System.out.print("\tLastKnownElr: " + 
info.lastKnownEligibleLeaderReplicas().stream()

Review Comment:
   ```suggestion
   System.out.print("\tLast Known ELRs: " + 
info.lastKnownEligibleLeaderReplicas().stream()
   ```



##
clients/src/main/java/org/apache/kafka/clients/admin/Admin.java:
##
@@ -335,6 +335,21 @@ default DescribeTopicsResult 
describeTopics(TopicCollection topics) {
  */
 DescribeTopicsResult describeTopics(TopicCollection topics, 
DescribeTopicsOptions options);
 
+/**
+ * Describe some topics in the cluster.
+ *
+ * When using topic IDs, this operation is supported by brokers with 
version 3.1.0 or higher.

Review Comment:
   What is the behavior when running sub-3.1.0?



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicPartitionsResult.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.protocol.Errors;
+
+import java.util.Collections;
+
+public class DescribeTopicPartitionsResult {
+final public TopicDescription topicDescription;
+final public Exception exception;

Review Comment:
   Have we considered using `Optional` to lessen the chance of 
spurious NPEs?



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -994,6 +1002,37 @@ public boolean isInternal() {
 }
 }
 
+abstract class RecurringCall {
+private final String name;
+final long deadlineMs;
+private final AdminClientRunnable runnable;
+KafkaFutureImpl nextRun;
+abstract Call generateCall();
+
+public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+this.name = name;
+this.deadlineMs = deadlineMs;
+this.runnable = runnable;
+}
+

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-27 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1505044853


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -994,6 +1002,36 @@ public boolean isInternal() {
 }
 }
 
+abstract class RecurringCall {
+private final String name;
+final long deadlineMs;
+private final AdminClientRunnable runnable;
+KafkaFutureImpl nextRun;
+abstract Call generateCall();
+
+public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+this.name = name;
+this.deadlineMs = deadlineMs;
+this.runnable = runnable;
+}
+
+public String toString() {
+return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + 
")";
+}
+
+public void run() {
+try {
+do {
+nextRun = new KafkaFutureImpl<>();
+Call call = generateCall();
+runnable.call(call, time.milliseconds());
+} while (nextRun.get());
+} catch (Exception e) {
+log.info("Stop the recurring call " + name + " because " + e);

Review Comment:
   Added the stack trace.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-27 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1505044493


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,25 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+public DescribeTopicsOptions useDescribeTopicsApi(boolean 
useDescribeTopicsApi) {
+this.useDescribeTopicsApi = useDescribeTopicsApi;
+return this;
+}

Review Comment:
   Removed this option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-27 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1505044242


##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -79,9 +95,24 @@ public List isr() {
 return isr;
 }
 
+/**
+ * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+ */
+public List elr() {
+return elr;
+}
+
+/**
+ * Return the last known eligible leader replicas of the partition. Note 
that the ordering of the result is unspecified.
+ */
+public List lastKnownElr() {
+return lastKnownElr;
+}
+
 public String toString() {
 return "(partition=" + partition + ", leader=" + leader + ", 
replicas=" +
-Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + 
")";
+Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
+Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")";

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-27 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1505043369


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) 
throws ExecutionException, I
 } else {
 ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
 }
-List 
topicDescriptions = new ArrayList<>();
 
 if (!topicIds.isEmpty()) {
 Map 
descTopics =
 
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+describeTopicsFollowUp(new ArrayList<>(descTopics.values()), 
opts);
+return;
 }
 
 if (!topics.isEmpty()) {
-Map 
descTopics =
-
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+final int partitionSizeLimit = 
opts.partitionSizeLimitPerResponse().orElse(2000);
+try {
+Iterator>> 
descTopicIterator =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+new 
DescribeTopicsOptions().useDescribeTopicsApi(true)
+
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+while (descTopicIterator.hasNext()) {
+List 
topicDescriptions = new ArrayList<>();
+for (int counter = 0; counter < partitionSizeLimit && 
descTopicIterator.hasNext();) {
+org.apache.kafka.clients.admin.TopicDescription 
topicDescription = descTopicIterator.next().getValue().get();
+topicDescriptions.add(topicDescription);
+counter += topicDescription.partitions().size();
+}
+describeTopicsFollowUp(topicDescriptions, opts);

Review Comment:
   Refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-27 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1505039131


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) 
throws ExecutionException, I
 } else {
 ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
 }
-List 
topicDescriptions = new ArrayList<>();
 
 if (!topicIds.isEmpty()) {
 Map 
descTopics =
 
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+describeTopicsFollowUp(new ArrayList<>(descTopics.values()), 
opts);
+return;
 }
 
 if (!topics.isEmpty()) {
-Map 
descTopics =
-
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+final int partitionSizeLimit = 
opts.partitionSizeLimitPerResponse().orElse(2000);
+try {
+Iterator>> 
descTopicIterator =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+new 
DescribeTopicsOptions().useDescribeTopicsApi(true)
+
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+while (descTopicIterator.hasNext()) {
+List 
topicDescriptions = new ArrayList<>();
+for (int counter = 0; counter < partitionSizeLimit && 
descTopicIterator.hasNext();) {
+org.apache.kafka.clients.admin.TopicDescription 
topicDescription = descTopicIterator.next().getValue().get();
+topicDescriptions.add(topicDescription);
+counter += topicDescription.partitions().size();
+}
+describeTopicsFollowUp(topicDescriptions, opts);
+}
+} catch (Exception e) {
+if (e.toString().contains("UnsupportedVersionException")) {
+// Retry the request with Metadata API.
+Map descTopics =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+new 
DescribeTopicsOptions().useDescribeTopicsApi(false)

Review Comment:
   Metadata API does not support pagination.



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) 
throws ExecutionException, I
 } else {
 ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
 }
-List 
topicDescriptions = new ArrayList<>();
 
 if (!topicIds.isEmpty()) {
 Map 
descTopics =
 
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+describeTopicsFollowUp(new ArrayList<>(descTopics.values()), 
opts);
+return;
 }
 
 if (!topics.isEmpty()) {
-Map 
descTopics =
-
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+final int partitionSizeLimit = 
opts.partitionSizeLimitPerResponse().orElse(2000);
+try {
+Iterator>> 
descTopicIterator =
+
adminClient.describeTopics(TopicCollection.ofTopicNames(topics),
+new 
DescribeTopicsOptions().useDescribeTopicsApi(true)
+
.partitionSizeLimitPerResponse(partitionSizeLimit)).topicNameValuesIterator();
+while (descTopicIterator.hasNext()) {
+List 
topicDescriptions = new ArrayList<>();
+for (int counter = 0; counter < partitionSizeLimit && 
descTopicIterator.hasNext();) {
+org.apache.kafka.clients.admin.TopicDescription 
topicDescription = descTopicIterator.next().getValue().get();
+topicDescriptions.add(topicDescription);
+counter += topicDescription.partitions().size();
+}
+describeTopicsFollowUp(topicDescriptions, opts);
+}
+} catch (Exception e) {
+if (e.toString().contains("UnsupportedVersionException")) {
+

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-27 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1505035988


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java:
##
@@ -36,28 +38,38 @@
 public class DescribeTopicsResult {
 private final Map> topicIdFutures;
 private final Map> nameFutures;
+private final Iterator>> 
nameFuturesIterator;
 
 @Deprecated
 protected DescribeTopicsResult(Map> 
futures) {
-this(null, futures);
+this(null, futures, null);
 }
 
 // VisibleForTesting
-protected DescribeTopicsResult(Map> 
topicIdFutures, Map> nameFutures) {
-if (topicIdFutures != null && nameFutures != null)
-throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
-if (topicIdFutures == null && nameFutures == null)
-throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+protected DescribeTopicsResult(
+Map> topicIdFutures,
+Map> nameFutures,
+Iterator>> 
nameFuturesIterator
+) {
+if (topicIdFutures != null && nameFutures != null && 
nameFuturesIterator != null)

Review Comment:
   Refactored.



##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java:
##
@@ -36,28 +38,38 @@
 public class DescribeTopicsResult {
 private final Map> topicIdFutures;
 private final Map> nameFutures;
+private final Iterator>> 
nameFuturesIterator;
 
 @Deprecated
 protected DescribeTopicsResult(Map> 
futures) {
-this(null, futures);
+this(null, futures, null);
 }
 
 // VisibleForTesting
-protected DescribeTopicsResult(Map> 
topicIdFutures, Map> nameFutures) {
-if (topicIdFutures != null && nameFutures != null)
-throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
-if (topicIdFutures == null && nameFutures == null)
-throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+protected DescribeTopicsResult(
+Map> topicIdFutures,
+Map> nameFutures,
+Iterator>> 
nameFuturesIterator
+) {
+if (topicIdFutures != null && nameFutures != null && 
nameFuturesIterator != null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
and nameFutureIterator cannot both be specified.");

Review Comment:
   Refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-26 Thread via GitHub


kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1503266038


##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -79,9 +95,24 @@ public List isr() {
 return isr;
 }
 
+/**
+ * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+ */
+public List elr() {
+return elr;
+}
+
+/**
+ * Return the last known eligible leader replicas of the partition. Note 
that the ordering of the result is unspecified.
+ */
+public List lastKnownElr() {

Review Comment:
   And likewise here:
   
   ```suggestion
   public List lastKnownEligibleLeaderReplicas() {
   ```



##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -79,9 +95,24 @@ public List isr() {
 return isr;
 }
 
+/**
+ * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+ */
+public List elr() {

Review Comment:
   Would you consider using the full name for the uninitiated?
   
   ```suggestion
   public List eligibleLeaderReplicas() {
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-26 Thread via GitHub


kirktrue commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1503175975


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsOptions.java:
##
@@ -47,8 +49,25 @@ public DescribeTopicsOptions 
includeAuthorizedOperations(boolean includeAuthoriz
 return this;
 }
 
+public DescribeTopicsOptions useDescribeTopicsApi(boolean 
useDescribeTopicsApi) {
+this.useDescribeTopicsApi = useDescribeTopicsApi;
+return this;
+}

Review Comment:
   Can we add some comments here for a developer to know _why_ to use the 
topics API or not?



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -994,6 +1002,36 @@ public boolean isInternal() {
 }
 }
 
+abstract class RecurringCall {
+private final String name;
+final long deadlineMs;
+private final AdminClientRunnable runnable;
+KafkaFutureImpl nextRun;
+abstract Call generateCall();
+
+public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+this.name = name;
+this.deadlineMs = deadlineMs;
+this.runnable = runnable;
+}
+
+public String toString() {
+return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + 
")";

Review Comment:
   ```suggestion
   return "RecurringCall(name=" + name + ", deadlineMs=" + 
deadlineMs + ")";
   ```



##
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##
@@ -79,9 +95,24 @@ public List isr() {
 return isr;
 }
 
+/**
+ * Return the eligible leader replicas of the partition. Note that the 
ordering of the result is unspecified.
+ */
+public List elr() {
+return elr;
+}
+
+/**
+ * Return the last known eligible leader replicas of the partition. Note 
that the ordering of the result is unspecified.
+ */
+public List lastKnownElr() {
+return lastKnownElr;
+}
+
 public String toString() {
 return "(partition=" + partition + ", leader=" + leader + ", 
replicas=" +
-Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + 
")";
+Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") +
+Utils.join(elr, ", ") + Utils.join(lastKnownElr, ", ") + ")";

Review Comment:
   ```suggestion
   Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + 
", elr=" +
   Utils.join(elr, ", ") + ", lastKnownElr=" + 
Utils.join(lastKnownElr, ", ") + ")";
   ```



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -994,6 +1002,36 @@ public boolean isInternal() {
 }
 }
 
+abstract class RecurringCall {
+private final String name;
+final long deadlineMs;
+private final AdminClientRunnable runnable;
+KafkaFutureImpl nextRun;
+abstract Call generateCall();
+
+public RecurringCall(String name, long deadlineMs, AdminClientRunnable 
runnable) {
+this.name = name;
+this.deadlineMs = deadlineMs;
+this.runnable = runnable;
+}
+
+public String toString() {
+return "RecurCall(name=" + name + ", deadlineMs=" + deadlineMs + 
")";
+}
+
+public void run() {
+try {
+do {
+nextRun = new KafkaFutureImpl<>();
+Call call = generateCall();
+runnable.call(call, time.milliseconds());
+} while (nextRun.get());
+} catch (Exception e) {
+log.info("Stop the recurring call " + name + " because " + e);

Review Comment:
   Are we specifically wanting to avoid outputting a stack trace?



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) {
 public DescribeTopicsResult describeTopics(final TopicCollection topics, 
DescribeTopicsOptions options) {
 if (topics instanceof TopicIdCollection)
 return 
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) 
topics).topicIds(), options));
-else if (topics instanceof TopicNameCollection)
+else if (topics instanceof TopicNameCollection) {
+if (options.useDescribeTopicsApi()) {
+return DescribeTopicsResult.ofTopicNameIterator(new 
DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), 
options));

Review Comment:
   It's been my experience that it's "dangerous"  to run arbitrary user code 
from within the context of the client code. User code can (and will) do 
unpredictable things with state, errors, threads, etc. The surrounding code 
inside the client has to be very careful to make sure it 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-23 Thread via GitHub


artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1501002625


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Not doing pipelining in the first version sounds good to me.  We can 
optimize later if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-22 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1499752721


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) {
 public DescribeTopicsResult describeTopics(final TopicCollection topics, 
DescribeTopicsOptions options) {
 if (topics instanceof TopicIdCollection)
 return 
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) 
topics).topicIds(), options));
-else if (topics instanceof TopicNameCollection)
+else if (topics instanceof TopicNameCollection) {
+if (options.useDescribeTopicsApi()) {
+return DescribeTopicsResult.ofTopicNameIterator(new 
DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), 
options));

Review Comment:
   @mumrah If we need to spend more time on public discussion, maybe we can 
first have a simple version like paging on a topic level. Also, limit the 
number of topics loading in memory like maybe 50 to accomplish the short-term 
goal of avoiding OOM.
   cc @artemlivshits 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-21 Thread via GitHub


mumrah commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1497780170


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   I think we can defer pipelining for now. If we do it the "dumb" way of just 
loading the next page of results as we drain the iterator, it will be slower, 
but it will work. 



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2108,9 +2146,12 @@ void handleFailure(Throwable throwable) {
 public DescribeTopicsResult describeTopics(final TopicCollection topics, 
DescribeTopicsOptions options) {
 if (topics instanceof TopicIdCollection)
 return 
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) 
topics).topicIds(), options));
-else if (topics instanceof TopicNameCollection)
+else if (topics instanceof TopicNameCollection) {
+if (options.useDescribeTopicsApi()) {
+return DescribeTopicsResult.ofTopicNameIterator(new 
DescribeTopicPartitionsIterator(((TopicNameCollection) topics).topicNames(), 
options));

Review Comment:
   DescribeTopicsResult is a Map based results class, which does not align the 
streaming/paging approach we are trying to achieve with this new RPC. I don't 
think we should try to reuse DescribeTopicsResult for the new API.
   
   Something like
   ```
   KafkaAdminClient#describeTopics(Consumer);
   ```
   
   would be more natural for a streaming interface. With an interface like 
this, we just need to be concerned with efficiently paging through the results. 
The application can gather/filter/post-process the results as needed. 
   
   Since this is a new public API, and really the first time doing streaming in 
KafkaAdminClient, I do think we should bring this up in the mailing list to 
collect feedback. E.g., do we want to return an Iterator/Stream, or accept a 
function like shown above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-20 Thread via GitHub


CalvinConfluent commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1496355495


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Thanks for the advice. We still want to avoid loading a huge topic in the 
memory but the current implementation does not really achieve that. I will make 
a change to address this.
   Maybe it is a bit over-engineering to have more than 1 ongoing call in the 
pipeline. I will keep at most one call in the next iteration commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-20 Thread via GitHub


artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1495047191


##
clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java:
##
@@ -36,28 +38,38 @@
 public class DescribeTopicsResult {
 private final Map> topicIdFutures;
 private final Map> nameFutures;
+private final Iterator>> 
nameFuturesIterator;
 
 @Deprecated
 protected DescribeTopicsResult(Map> 
futures) {
-this(null, futures);
+this(null, futures, null);
 }
 
 // VisibleForTesting
-protected DescribeTopicsResult(Map> 
topicIdFutures, Map> nameFutures) {
-if (topicIdFutures != null && nameFutures != null)
-throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be specified.");
-if (topicIdFutures == null && nameFutures == null)
-throw new IllegalArgumentException("topicIdFutures and nameFutures 
cannot both be null.");
+protected DescribeTopicsResult(
+Map> topicIdFutures,
+Map> nameFutures,
+Iterator>> 
nameFuturesIterator
+) {
+if (topicIdFutures != null && nameFutures != null && 
nameFuturesIterator != null)
+throw new IllegalArgumentException("topicIdFutures and nameFutures 
and nameFutureIterator cannot both be specified.");

Review Comment:
   nit: using "both" implies that there are 2 things, we now have 3.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Didn't quite get how the recurring call framework enables for lazy 
generation?  The RecurringCall.run() just runs the whole thing in one go.



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {

Review Comment:
   Looking at the current logic I cannot make sense out of it.  For some reason 
we get all topic names that fit under the batch size and then get all 
partitions for those topics and put them in memory.
   
   Why did we bother with all paging complexity if all we do is to get all 
partitions for 2000 topics in memory?  Can we just iterate topic by topic (as 
it seems that we get the list of all topics anyway) and just get partitions for 
each one?
   
   With the paging, I would've expected that we did something like this:
   
   1. start iteration -- issue one call, once the first call is completed maybe 
issue a few more (say 3) calls ahead for pipelining.
   2. next  -- iterate over current batch.
   3. once the current batch is exhausted, switch to the next batch, pipeline 
one more call.
   
   This way if the caller finishes iteration early or if iteration is slow, we 
won't keep piling results in memory.



##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -558,20 +568,53 @@ public void describeTopic(TopicCommandOptions opts) 
throws ExecutionException, I
 } else {
 ensureTopicExists(topics, opts.topic().orElse(""), 
!opts.ifExists());
 }
-List 
topicDescriptions = new ArrayList<>();
 
 if (!topicIds.isEmpty()) {
 Map 
descTopics =
 
adminClient.describeTopics(TopicCollection.ofTopicIds(topicIds)).allTopicIds().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+describeTopicsFollowUp(new ArrayList<>(descTopics.values()), 
opts);
+return;
 }
 
 if (!topics.isEmpty()) {
-Map 
descTopics =
-
adminClient.describeTopics(TopicCollection.ofTopicNames(topics)).allTopicNames().get();
-topicDescriptions = new ArrayList<>(descTopics.values());
+final int partitionSizeLimit = 
opts.partitionSizeLimitPerResponse().orElse(2000);
+try {
+Iterator>> 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-15 Thread via GitHub


artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1491416954


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {
+Map pendingTopics =
+topicNames.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)
+);
 
-@Override
-MetadataRequest.Builder createRequest(int timeoutMs) {
-if (supportsDisablingTopicCreation)
-return new MetadataRequest.Builder(new 
MetadataRequestData()
-
.setTopics(convertToMetadataRequestTopic(topicNamesList))
-.setAllowAutoTopicCreation(false)
-
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
-else
-return MetadataRequest.Builder.allTopics();
-}
+String partiallyFinishedTopicName = "";
+int partiallyFinishedTopicNextPartitionId = -1;
+TopicDescription partiallyFinishedTopicDescription = null;
 
-@Override
-void handleResponse(AbstractResponse abstractResponse) {
-MetadataResponse response = (MetadataResponse) 
abstractResponse;
-// Handle server responses for particular topics.
-Cluster cluster = response.buildCluster();
-Map errors = response.errors();
-for (Map.Entry> 
entry : topicFutures.entrySet()) {
-String topicName = entry.getKey();
-KafkaFutureImpl future = 
entry.getValue();
-Errors topicError = errors.get(topicName);
-if (topicError != null) {
-future.completeExceptionally(topicError.exception());
-continue;
+@Override
+Call generateCall() {
+return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+@Override
+DescribeTopicPartitionsRequest.Builder 
createRequest(int timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+
.setTopics(pendingTopics.values().stream().collect(Collectors.toList()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (!partiallyFinishedTopicName.isEmpty()) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+);
+}
+return new 
DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) 
{
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+String cursorTopicName = "";
+int cursorPartitionId = -1;
+if (response.data().nextCursor() != null) {
+DescribeTopicPartitionsResponseData.Cursor 
cursor = response.data().nextCursor();
+cursorTopicName = cursor.topicName();
+cursorPartitionId = cursor.partitionIndex();
+}
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = 
Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+
future.completeExceptionally(error.exception());
+  

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-14 Thread via GitHub


mumrah commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1489713255


##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {
+Map pendingTopics =
+topicNames.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)
+);
 
-@Override
-MetadataRequest.Builder createRequest(int timeoutMs) {
-if (supportsDisablingTopicCreation)
-return new MetadataRequest.Builder(new 
MetadataRequestData()
-
.setTopics(convertToMetadataRequestTopic(topicNamesList))
-.setAllowAutoTopicCreation(false)
-
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
-else
-return MetadataRequest.Builder.allTopics();
-}
+String partiallyFinishedTopicName = "";
+int partiallyFinishedTopicNextPartitionId = -1;
+TopicDescription partiallyFinishedTopicDescription = null;
 
-@Override
-void handleResponse(AbstractResponse abstractResponse) {
-MetadataResponse response = (MetadataResponse) 
abstractResponse;
-// Handle server responses for particular topics.
-Cluster cluster = response.buildCluster();
-Map errors = response.errors();
-for (Map.Entry> 
entry : topicFutures.entrySet()) {
-String topicName = entry.getKey();
-KafkaFutureImpl future = 
entry.getValue();
-Errors topicError = errors.get(topicName);
-if (topicError != null) {
-future.completeExceptionally(topicError.exception());
-continue;
+@Override
+Call generateCall() {
+return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+@Override
+DescribeTopicPartitionsRequest.Builder 
createRequest(int timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+
.setTopics(pendingTopics.values().stream().collect(Collectors.toList()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (!partiallyFinishedTopicName.isEmpty()) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+);
+}
+return new 
DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) 
{
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+String cursorTopicName = "";
+int cursorPartitionId = -1;
+if (response.data().nextCursor() != null) {
+DescribeTopicPartitionsResponseData.Cursor 
cursor = response.data().nextCursor();
+cursorTopicName = cursor.topicName();
+cursorPartitionId = cursor.partitionIndex();
+}
+
+for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+String topicName = topic.name();
+Errors error = 
Errors.forCode(topic.errorCode());
+
+KafkaFutureImpl future = 
topicFutures.get(topicName);
+if (error != Errors.NONE) {
+
future.completeExceptionally(error.exception());
+ 

Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-02-02 Thread via GitHub


artemlivshits commented on code in PR #15265:
URL: https://github.com/apache/kafka/pull/15265#discussion_r1476922434


##
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##
@@ -799,6 +814,13 @@ public TopicCommandOptions(String[] args) {
 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
 excludeInternalTopicOpt = parser.accepts("exclude-internal",
 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+useDescribeTopicsApiOpt = parser.accepts("use-describe-topics-api",

Review Comment:
   Instead of an explicit option can we just try to use describe topics API if 
the broker has it and otherwise fall back to metadata API?  



##
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##
@@ -2129,63 +2167,183 @@ private Map> 
handleDescribeTopicsByNames(f
 }
 }
 final long now = time.milliseconds();
-Call call = new Call("describeTopics", calcDeadlineMs(now, 
options.timeoutMs()),
-new LeastLoadedNodeProvider()) {
 
-private boolean supportsDisablingTopicCreation = true;
+if (options.useDescribeTopicsApi()) {
+RecurringCall call = new RecurringCall("DescribeTopics-Recurring", 
calcDeadlineMs(now, options.timeoutMs()), runnable) {
+Map pendingTopics =
+topicNames.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+.collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)
+);
 
-@Override
-MetadataRequest.Builder createRequest(int timeoutMs) {
-if (supportsDisablingTopicCreation)
-return new MetadataRequest.Builder(new 
MetadataRequestData()
-
.setTopics(convertToMetadataRequestTopic(topicNamesList))
-.setAllowAutoTopicCreation(false)
-
.setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations()));
-else
-return MetadataRequest.Builder.allTopics();
-}
+String partiallyFinishedTopicName = "";
+int partiallyFinishedTopicNextPartitionId = -1;
+TopicDescription partiallyFinishedTopicDescription = null;
 
-@Override
-void handleResponse(AbstractResponse abstractResponse) {
-MetadataResponse response = (MetadataResponse) 
abstractResponse;
-// Handle server responses for particular topics.
-Cluster cluster = response.buildCluster();
-Map errors = response.errors();
-for (Map.Entry> 
entry : topicFutures.entrySet()) {
-String topicName = entry.getKey();
-KafkaFutureImpl future = 
entry.getValue();
-Errors topicError = errors.get(topicName);
-if (topicError != null) {
-future.completeExceptionally(topicError.exception());
-continue;
+@Override
+Call generateCall() {
+return new Call("describeTopics", this.deadlineMs, new 
LeastLoadedNodeProvider()) {
+@Override
+DescribeTopicPartitionsRequest.Builder 
createRequest(int timeoutMs) {
+DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+
.setTopics(pendingTopics.values().stream().collect(Collectors.toList()))
+
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+if (!partiallyFinishedTopicName.isEmpty()) {
+request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+.setTopicName(partiallyFinishedTopicName)
+
.setPartitionIndex(partiallyFinishedTopicNextPartitionId)
+);
+}
+return new 
DescribeTopicPartitionsRequest.Builder(request);
+}
+
+@Override
+void handleResponse(AbstractResponse abstractResponse) 
{
+DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+String cursorTopicName = "";
+int cursorPartitionId = -1;
+if (response.data().nextCursor() != null) {
+DescribeTopicPartitionsResponseData.Cursor 
cursor = 

[PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]

2024-01-25 Thread via GitHub


CalvinConfluent opened a new pull request, #15265:
URL: https://github.com/apache/kafka/pull/15265

   https://issues.apache.org/jira/browse/KAFKA-15585
   Add the support for DescribeTopicPartitions API on the client side.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org