mumrah commented on code in PR #15470:
URL: https://github.com/apache/kafka/pull/15470#discussion_r1526390858


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;
+    }
+
+    @SuppressWarnings({"MethodLength", "NPathComplexity"})

Review Comment:
   Instead of suppressing these warnings, can we refactor the method to be less 
complex? For example, the Call instance could be defined elsewhere



##########
clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java:
##########
@@ -40,12 +42,26 @@ public class TopicPartitionInfo {
      * @param replicas the replicas of the partition in the same order as the 
replica assignment (the preferred replica
      *                 is the head of the list)
      * @param isr the in-sync replicas
+     * @param elr the eligible leader replicas
+     * @param lastKnownElr the last known eligible leader replicas.
      */
+    public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, 
List<Node> isr,
+                              List<Node> elr, List<Node> lastKnownElr) {

Review Comment:
   nit: use this style if you need more than one line
   ```
   Foo(
     int x,
     int y
   ) {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;

Review Comment:
   Why did we remove this optimization? Seems like we should try to keep the 
Metadata API code unchanged if possible



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) {
                 completeAllExceptionally(topicFutures.values(), throwable);
             }
         };
-        if (!topicNamesList.isEmpty()) {
-            runnable.call(call, now);
+        return call;
+    }
+
+    @SuppressWarnings({"MethodLength", "NPathComplexity"})
+    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(
+        final Collection<String> topicNames,
+        DescribeTopicsOptions options
+    ) {
+        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
+        final ArrayList<String> topicNamesList = new ArrayList<>();
+        for (String topicName : topicNames) {
+            if (topicNameIsUnrepresentable(topicName)) {
+                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
+                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
+                    topicName + "' cannot be represented in a request."));
+                topicFutures.put(topicName, future);
+            } else if (!topicFutures.containsKey(topicName)) {
+                topicFutures.put(topicName, new KafkaFutureImpl<>());
+                topicNamesList.add(topicName);
+            }
         }
+
+        if (topicNamesList.isEmpty()) {
+            return new HashMap<>(topicFutures);
+        }
+
+        // First, we need to retrieve the node info.
+        DescribeClusterResult clusterResult = describeCluster();
+        Map<Integer, Node> nodes;
+        try {
+            nodes = 
clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> 
node));
+        } catch (InterruptedException | ExecutionException e) {
+            completeAllExceptionally(topicFutures.values(), e.getCause());
+            return new HashMap<>(topicFutures);
+        }
+
+        final long now = time.milliseconds();
+        Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
+            new LeastLoadedNodeProvider()) {
+            Map<String, TopicRequest> pendingTopics =
+                topicNamesList.stream().map(topicName -> new 
TopicRequest().setName(topicName))
+                    .collect(Collectors.toMap(topicRequest -> 
topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, 
TreeMap::new));
+
+            TopicDescription partiallyFinishedTopicDescription = null;
+
+            @Override
+            DescribeTopicPartitionsRequest.Builder createRequest(int 
timeoutMs) {
+                DescribeTopicPartitionsRequestData request = new 
DescribeTopicPartitionsRequestData()
+                    .setTopics(new ArrayList<>(pendingTopics.values()))
+                    
.setResponsePartitionLimit(options.partitionSizeLimitPerResponse());
+                if (partiallyFinishedTopicDescription != null) {
+                    // If the previous cursor points to the partition 0, the 
cursor will not be set as the first one
+                    // in the topic list should be the previous cursor topic.
+                    request.setCursor(new 
DescribeTopicPartitionsRequestData.Cursor()
+                        .setTopicName(partiallyFinishedTopicDescription.name())
+                        
.setPartitionIndex(partiallyFinishedTopicDescription.partitions().size())
+                    );
+                }
+                return new DescribeTopicPartitionsRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                DescribeTopicPartitionsResponse response = 
(DescribeTopicPartitionsResponse) abstractResponse;
+                DescribeTopicPartitionsResponseData.Cursor responseCursor = 
response.data().nextCursor();
+                // The topicDescription for the cursor topic of the current 
batch.
+                TopicDescription nextTopicDescription = null;
+
+                for (DescribeTopicPartitionsResponseTopic topic : 
response.data().topics()) {
+                    String topicName = topic.name();
+                    Errors error = Errors.forCode(topic.errorCode());
+
+                    KafkaFutureImpl<TopicDescription> future = 
topicFutures.get(topicName);
+
+                    if (error != Errors.NONE) {
+                        future.completeExceptionally(error.exception());
+                        pendingTopics.remove(topicName);
+                        if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                            responseCursor = null;
+                        }
+                        continue;
+                    }
+
+                    TopicDescription currentTopicDescription = 
getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes);
+
+                    if (partiallyFinishedTopicDescription != null && 
partiallyFinishedTopicDescription.name().equals(topicName)) {
+                        // Add the partitions for the cursor topic of the 
previous batch.
+                        
partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions());
+                        continue;
+                    }
+
+                    if (responseCursor != null && 
responseCursor.topicName().equals(topicName)) {
+                        // In the same batch of result, it may need to handle 
the partitions for the previous cursor
+                        // topic and the current cursor topic. Cache the 
result in the nextTopicDescription.
+                        nextTopicDescription = currentTopicDescription;
+                        continue;
+                    }
+
+                    pendingTopics.remove(topicName);
+                    future.complete(currentTopicDescription);
+                }
+
+                if (partiallyFinishedTopicDescription != null &&
+                        (responseCursor == null || 
!responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) {
+                    // We can't simply check nextTopicDescription != null here 
to close the partiallyFinishedTopicDescription.
+                    // Because the responseCursor topic may not show in the 
response.
+                    String topicName = 
partiallyFinishedTopicDescription.name();
+                    
topicFutures.get(topicName).complete(partiallyFinishedTopicDescription);
+                    pendingTopics.remove(topicName);
+                    partiallyFinishedTopicDescription = null;
+                }
+                if (nextTopicDescription != null) {
+                    partiallyFinishedTopicDescription = nextTopicDescription;
+                }
+
+                if (!pendingTopics.isEmpty()) {
+                    runnable.call(this, time.milliseconds());
+                }
+            }
+
+            @Override
+            boolean 
handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                final long now = time.milliseconds();
+                log.warn("The DescribeTopicPartitions API is not supported, 
using Metadata API to describe topics.");
+                
runnable.call(generateDescribeTopicsCallWithMetadataAPI(topicNamesList, 
topicFutures, options, now), now);
+                return false;
+            }

Review Comment:
   If I remember correctly, this will get called before we actually send off 
the request, right? (Assuming the broker doesn't support the new API)



##########
tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java:
##########
@@ -639,6 +639,25 @@ public void testDescribe(String quorum) throws 
ExecutionException, InterruptedEx
         assertTrue(rows[0].startsWith(String.format("Topic: %s", 
testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + 
rows[0]);
     }
 
+    @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)
+    @ValueSource(strings = {"quorum=zk", "quorum=kraft"})
+    public void testDescribeWithDescribeTopicPartitionsApi(String quorum) 
throws ExecutionException, InterruptedException {
+        TestUtils.createTopicWithAdmin(adminClient, testTopicName, 
scalaBrokers, scalaControllers, 3, (short) 2,
+            scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
+        );
+        String secondTopicName = "test-2";
+        TestUtils.createTopicWithAdmin(adminClient, secondTopicName, 
scalaBrokers, scalaControllers, 3, (short) 2,
+            scala.collection.immutable.Map$.MODULE$.empty(), new Properties()
+        );
+
+        String output = 
captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap(
+            "--describe", "--partition-size-limit-per-response=1"));
+        String[] rows = output.split("\n");
+        assertEquals(8, rows.length, String.join("\n", rows));
+        assertTrue(rows[2].contains("\tElr"), rows[2]);
+        assertTrue(rows[2].contains("LastKnownElr"), rows[2]);
+    }
+
     @ParameterizedTest(name = 
ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME)

Review Comment:
   I'd like to see an integration test here that utilizes pagination. Something 
like 3 topics with various number of partitions between 50 and 100. Then use a 
page size of 20 or so. 



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2256,6 +2384,26 @@ void handleFailure(Throwable throwable) {
         return new HashMap<>(topicFutures);
     }
 
+    private TopicDescription 
getTopicDescriptionFromDescribeTopicsResponseTopic(
+        DescribeTopicPartitionsResponseTopic topic,
+        Map<Integer, Node> nodes
+    ) {
+        List<DescribeTopicPartitionsResponsePartition> partitionInfos = 
topic.partitions();
+        List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
+        for (DescribeTopicPartitionsResponsePartition partitionInfo : 
partitionInfos) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+                partitionInfo.partitionIndex(),
+                nodes.get(partitionInfo.leaderId()),
+                partitionInfo.replicaNodes().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+                partitionInfo.isrNodes().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+                partitionInfo.eligibleLeaderReplicas().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+                partitionInfo.lastKnownElr().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()));
+            partitions.add(topicPartitionInfo);
+        }
+        
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));

Review Comment:
   Do we expect the partitions to be ordered already? Wondering why we need 
this sort here?
   
   



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2256,6 +2384,26 @@ void handleFailure(Throwable throwable) {
         return new HashMap<>(topicFutures);
     }
 
+    private TopicDescription 
getTopicDescriptionFromDescribeTopicsResponseTopic(
+        DescribeTopicPartitionsResponseTopic topic,
+        Map<Integer, Node> nodes
+    ) {
+        List<DescribeTopicPartitionsResponsePartition> partitionInfos = 
topic.partitions();
+        List<TopicPartitionInfo> partitions = new 
ArrayList<>(partitionInfos.size());
+        for (DescribeTopicPartitionsResponsePartition partitionInfo : 
partitionInfos) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(
+                partitionInfo.partitionIndex(),
+                nodes.get(partitionInfo.leaderId()),
+                partitionInfo.replicaNodes().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+                partitionInfo.isrNodes().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+                partitionInfo.eligibleLeaderReplicas().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()),
+                partitionInfo.lastKnownElr().stream().map(id -> 
nodes.get(id)).collect(Collectors.toList()));

Review Comment:
   This many similar usages suggest we could use a helper method here. I see we 
already have a private helper `MetadataResponse#convertToNodeArray`. Maybe in a 
future PR we can consolidate both to use a new common helper method?



##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -799,6 +820,11 @@ public TopicCommandOptions(String[] args) {
                 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
             excludeInternalTopicOpt = parser.accepts("exclude-internal",
                 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+            partitionSizeLimitPerResponseOpt = 
parser.accepts("partition-size-limit-per-response",
+                "the maximum partition size to be included in one 
DescribeTopicPartitions response. Only valid if use-describe-topics-api is 
used")

Review Comment:
   Where is `use-describe-topics-api` defined?



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -2110,26 +2118,12 @@ public DescribeTopicsResult describeTopics(final 
TopicCollection topics, Describ
         if (topics instanceof TopicIdCollection)
             return 
DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) 
topics).topicIds(), options));
         else if (topics instanceof TopicNameCollection)
-            return 
DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection)
 topics).topicNames(), options));
+            return 
DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(((TopicNameCollection)
 topics).topicNames(), options));
         else
             throw new IllegalArgumentException("The TopicCollection: " + 
topics + " provided did not match any supported classes for describeTopics.");
     }
 
-    private Map<String, KafkaFuture<TopicDescription>> 
handleDescribeTopicsByNames(final Collection<String> topicNames, 
DescribeTopicsOptions options) {
-        final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = 
new HashMap<>(topicNames.size());
-        final ArrayList<String> topicNamesList = new ArrayList<>();
-        for (String topicName : topicNames) {
-            if (topicNameIsUnrepresentable(topicName)) {
-                KafkaFutureImpl<TopicDescription> future = new 
KafkaFutureImpl<>();
-                future.completeExceptionally(new InvalidTopicException("The 
given topic name '" +
-                    topicName + "' cannot be represented in a request."));
-                topicFutures.put(topicName, future);
-            } else if (!topicFutures.containsKey(topicName)) {
-                topicFutures.put(topicName, new KafkaFutureImpl<>());
-                topicNamesList.add(topicName);
-            }
-        }
-        final long now = time.milliseconds();
+    Call generateDescribeTopicsCallWithMetadataAPI(List<String> 
topicNamesList, Map<String, KafkaFutureImpl<TopicDescription>> topicFutures, 
DescribeTopicsOptions options, long now) {

Review Comment:
   nit: break up this long method signature line



##########
tools/src/main/java/org/apache/kafka/tools/TopicCommand.java:
##########
@@ -799,6 +820,11 @@ public TopicCommandOptions(String[] args) {
                 "if set when creating topics, the action will only execute if 
the topic does not already exist.");
             excludeInternalTopicOpt = parser.accepts("exclude-internal",
                 "exclude internal topics when running list or describe 
command. The internal topics will be listed by default");
+            partitionSizeLimitPerResponseOpt = 
parser.accepts("partition-size-limit-per-response",
+                "the maximum partition size to be included in one 
DescribeTopicPartitions response. Only valid if use-describe-topics-api is 
used")
+                    .withRequiredArg()
+                    .describedAs("maximun # of partitions in one response.")

Review Comment:
   typo. Also, let's type out "number" instead of "#"



##########
trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java:
##########
@@ -176,7 +176,7 @@ public void testCreatesNotExistingTopics() throws Throwable 
{
             new TopicDescription(
                 TEST_TOPIC, false,
                 Collections.singletonList(
-                    new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.<Node>emptyList()))),
+                    new TopicPartitionInfo(0, broker1, singleReplica, 
Collections.<Node>emptyList(), Collections.<Node>emptyList(), 
Collections.<Node>emptyList()))),

Review Comment:
   Pretty sure we don't need the type here. Java should be able to infer the 
type from the TopicPartitionInfo argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to