mumrah commented on code in PR #15470: URL: https://github.com/apache/kafka/pull/15470#discussion_r1526390858
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; + } + + @SuppressWarnings({"MethodLength", "NPathComplexity"}) Review Comment: Instead of suppressing these warnings, can we refactor the method to be less complex? For example, the Call instance could be defined elsewhere ########## clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java: ########## @@ -40,12 +42,26 @@ public class TopicPartitionInfo { * @param replicas the replicas of the partition in the same order as the replica assignment (the preferred replica * is the head of the list) * @param isr the in-sync replicas + * @param elr the eligible leader replicas + * @param lastKnownElr the last known eligible leader replicas. */ + public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr, + List<Node> elr, List<Node> lastKnownElr) { Review Comment: nit: use this style if you need more than one line ``` Foo( int x, int y ) { ``` ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; Review Comment: Why did we remove this optimization? Seems like we should try to keep the Metadata API code unchanged if possible ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2185,9 +2179,143 @@ void handleFailure(Throwable throwable) { completeAllExceptionally(topicFutures.values(), throwable); } }; - if (!topicNamesList.isEmpty()) { - runnable.call(call, now); + return call; + } + + @SuppressWarnings({"MethodLength", "NPathComplexity"}) + private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi( + final Collection<String> topicNames, + DescribeTopicsOptions options + ) { + final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); + final ArrayList<String> topicNamesList = new ArrayList<>(); + for (String topicName : topicNames) { + if (topicNameIsUnrepresentable(topicName)) { + KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new InvalidTopicException("The given topic name '" + + topicName + "' cannot be represented in a request.")); + topicFutures.put(topicName, future); + } else if (!topicFutures.containsKey(topicName)) { + topicFutures.put(topicName, new KafkaFutureImpl<>()); + topicNamesList.add(topicName); + } } + + if (topicNamesList.isEmpty()) { + return new HashMap<>(topicFutures); + } + + // First, we need to retrieve the node info. + DescribeClusterResult clusterResult = describeCluster(); + Map<Integer, Node> nodes; + try { + nodes = clusterResult.nodes().get().stream().collect(Collectors.toMap(Node::id, node -> node)); + } catch (InterruptedException | ExecutionException e) { + completeAllExceptionally(topicFutures.values(), e.getCause()); + return new HashMap<>(topicFutures); + } + + final long now = time.milliseconds(); + Call call = new Call("describeTopicPartitions", calcDeadlineMs(now, options.timeoutMs()), + new LeastLoadedNodeProvider()) { + Map<String, TopicRequest> pendingTopics = + topicNamesList.stream().map(topicName -> new TopicRequest().setName(topicName)) + .collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new)); + + TopicDescription partiallyFinishedTopicDescription = null; + + @Override + DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { + DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(new ArrayList<>(pendingTopics.values())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); + if (partiallyFinishedTopicDescription != null) { + // If the previous cursor points to the partition 0, the cursor will not be set as the first one + // in the topic list should be the previous cursor topic. + request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() + .setTopicName(partiallyFinishedTopicDescription.name()) + .setPartitionIndex(partiallyFinishedTopicDescription.partitions().size()) + ); + } + return new DescribeTopicPartitionsRequest.Builder(request); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; + DescribeTopicPartitionsResponseData.Cursor responseCursor = response.data().nextCursor(); + // The topicDescription for the cursor topic of the current batch. + TopicDescription nextTopicDescription = null; + + for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { + String topicName = topic.name(); + Errors error = Errors.forCode(topic.errorCode()); + + KafkaFutureImpl<TopicDescription> future = topicFutures.get(topicName); + + if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); + pendingTopics.remove(topicName); + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + responseCursor = null; + } + continue; + } + + TopicDescription currentTopicDescription = getTopicDescriptionFromDescribeTopicsResponseTopic(topic, nodes); + + if (partiallyFinishedTopicDescription != null && partiallyFinishedTopicDescription.name().equals(topicName)) { + // Add the partitions for the cursor topic of the previous batch. + partiallyFinishedTopicDescription.partitions().addAll(currentTopicDescription.partitions()); + continue; + } + + if (responseCursor != null && responseCursor.topicName().equals(topicName)) { + // In the same batch of result, it may need to handle the partitions for the previous cursor + // topic and the current cursor topic. Cache the result in the nextTopicDescription. + nextTopicDescription = currentTopicDescription; + continue; + } + + pendingTopics.remove(topicName); + future.complete(currentTopicDescription); + } + + if (partiallyFinishedTopicDescription != null && + (responseCursor == null || !responseCursor.topicName().equals(partiallyFinishedTopicDescription.name()))) { + // We can't simply check nextTopicDescription != null here to close the partiallyFinishedTopicDescription. + // Because the responseCursor topic may not show in the response. + String topicName = partiallyFinishedTopicDescription.name(); + topicFutures.get(topicName).complete(partiallyFinishedTopicDescription); + pendingTopics.remove(topicName); + partiallyFinishedTopicDescription = null; + } + if (nextTopicDescription != null) { + partiallyFinishedTopicDescription = nextTopicDescription; + } + + if (!pendingTopics.isEmpty()) { + runnable.call(this, time.milliseconds()); + } + } + + @Override + boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { + final long now = time.milliseconds(); + log.warn("The DescribeTopicPartitions API is not supported, using Metadata API to describe topics."); + runnable.call(generateDescribeTopicsCallWithMetadataAPI(topicNamesList, topicFutures, options, now), now); + return false; + } Review Comment: If I remember correctly, this will get called before we actually send off the request, right? (Assuming the broker doesn't support the new API) ########## tools/src/test/java/org/apache/kafka/tools/TopicCommandIntegrationTest.java: ########## @@ -639,6 +639,25 @@ public void testDescribe(String quorum) throws ExecutionException, InterruptedEx assertTrue(rows[0].startsWith(String.format("Topic: %s", testTopicName)), "Row does not start with " + testTopicName + ". Row is: " + rows[0]); } + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) + @ValueSource(strings = {"quorum=zk", "quorum=kraft"}) + public void testDescribeWithDescribeTopicPartitionsApi(String quorum) throws ExecutionException, InterruptedException { + TestUtils.createTopicWithAdmin(adminClient, testTopicName, scalaBrokers, scalaControllers, 3, (short) 2, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + String secondTopicName = "test-2"; + TestUtils.createTopicWithAdmin(adminClient, secondTopicName, scalaBrokers, scalaControllers, 3, (short) 2, + scala.collection.immutable.Map$.MODULE$.empty(), new Properties() + ); + + String output = captureDescribeTopicStandardOut(buildTopicCommandOptionsWithBootstrap( + "--describe", "--partition-size-limit-per-response=1")); + String[] rows = output.split("\n"); + assertEquals(8, rows.length, String.join("\n", rows)); + assertTrue(rows[2].contains("\tElr"), rows[2]); + assertTrue(rows[2].contains("LastKnownElr"), rows[2]); + } + @ParameterizedTest(name = ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_NAME) Review Comment: I'd like to see an integration test here that utilizes pagination. Something like 3 topics with various number of partitions between 50 and 100. Then use a page size of 20 or so. ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2256,6 +2384,26 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } + private TopicDescription getTopicDescriptionFromDescribeTopicsResponseTopic( + DescribeTopicPartitionsResponseTopic topic, + Map<Integer, Node> nodes + ) { + List<DescribeTopicPartitionsResponsePartition> partitionInfos = topic.partitions(); + List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size()); + for (DescribeTopicPartitionsResponsePartition partitionInfo : partitionInfos) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( + partitionInfo.partitionIndex(), + nodes.get(partitionInfo.leaderId()), + partitionInfo.replicaNodes().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), + partitionInfo.isrNodes().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), + partitionInfo.eligibleLeaderReplicas().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), + partitionInfo.lastKnownElr().stream().map(id -> nodes.get(id)).collect(Collectors.toList())); + partitions.add(topicPartitionInfo); + } + partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition)); Review Comment: Do we expect the partitions to be ordered already? Wondering why we need this sort here? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2256,6 +2384,26 @@ void handleFailure(Throwable throwable) { return new HashMap<>(topicFutures); } + private TopicDescription getTopicDescriptionFromDescribeTopicsResponseTopic( + DescribeTopicPartitionsResponseTopic topic, + Map<Integer, Node> nodes + ) { + List<DescribeTopicPartitionsResponsePartition> partitionInfos = topic.partitions(); + List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size()); + for (DescribeTopicPartitionsResponsePartition partitionInfo : partitionInfos) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( + partitionInfo.partitionIndex(), + nodes.get(partitionInfo.leaderId()), + partitionInfo.replicaNodes().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), + partitionInfo.isrNodes().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), + partitionInfo.eligibleLeaderReplicas().stream().map(id -> nodes.get(id)).collect(Collectors.toList()), + partitionInfo.lastKnownElr().stream().map(id -> nodes.get(id)).collect(Collectors.toList())); Review Comment: This many similar usages suggest we could use a helper method here. I see we already have a private helper `MetadataResponse#convertToNodeArray`. Maybe in a future PR we can consolidate both to use a new common helper method? ########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -799,6 +820,11 @@ public TopicCommandOptions(String[] args) { "if set when creating topics, the action will only execute if the topic does not already exist."); excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default"); + partitionSizeLimitPerResponseOpt = parser.accepts("partition-size-limit-per-response", + "the maximum partition size to be included in one DescribeTopicPartitions response. Only valid if use-describe-topics-api is used") Review Comment: Where is `use-describe-topics-api` defined? ########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -2110,26 +2118,12 @@ public DescribeTopicsResult describeTopics(final TopicCollection topics, Describ if (topics instanceof TopicIdCollection) return DescribeTopicsResult.ofTopicIds(handleDescribeTopicsByIds(((TopicIdCollection) topics).topicIds(), options)); else if (topics instanceof TopicNameCollection) - return DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNames(((TopicNameCollection) topics).topicNames(), options)); + return DescribeTopicsResult.ofTopicNames(handleDescribeTopicsByNamesWithDescribeTopicPartitionsApi(((TopicNameCollection) topics).topicNames(), options)); else throw new IllegalArgumentException("The TopicCollection: " + topics + " provided did not match any supported classes for describeTopics."); } - private Map<String, KafkaFuture<TopicDescription>> handleDescribeTopicsByNames(final Collection<String> topicNames, DescribeTopicsOptions options) { - final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); - final ArrayList<String> topicNamesList = new ArrayList<>(); - for (String topicName : topicNames) { - if (topicNameIsUnrepresentable(topicName)) { - KafkaFutureImpl<TopicDescription> future = new KafkaFutureImpl<>(); - future.completeExceptionally(new InvalidTopicException("The given topic name '" + - topicName + "' cannot be represented in a request.")); - topicFutures.put(topicName, future); - } else if (!topicFutures.containsKey(topicName)) { - topicFutures.put(topicName, new KafkaFutureImpl<>()); - topicNamesList.add(topicName); - } - } - final long now = time.milliseconds(); + Call generateDescribeTopicsCallWithMetadataAPI(List<String> topicNamesList, Map<String, KafkaFutureImpl<TopicDescription>> topicFutures, DescribeTopicsOptions options, long now) { Review Comment: nit: break up this long method signature line ########## tools/src/main/java/org/apache/kafka/tools/TopicCommand.java: ########## @@ -799,6 +820,11 @@ public TopicCommandOptions(String[] args) { "if set when creating topics, the action will only execute if the topic does not already exist."); excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default"); + partitionSizeLimitPerResponseOpt = parser.accepts("partition-size-limit-per-response", + "the maximum partition size to be included in one DescribeTopicPartitions response. Only valid if use-describe-topics-api is used") + .withRequiredArg() + .describedAs("maximun # of partitions in one response.") Review Comment: typo. Also, let's type out "number" instead of "#" ########## trogdor/src/test/java/org/apache/kafka/trogdor/common/WorkerUtilsTest.java: ########## @@ -176,7 +176,7 @@ public void testCreatesNotExistingTopics() throws Throwable { new TopicDescription( TEST_TOPIC, false, Collections.singletonList( - new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList()))), + new TopicPartitionInfo(0, broker1, singleReplica, Collections.<Node>emptyList(), Collections.<Node>emptyList(), Collections.<Node>emptyList()))), Review Comment: Pretty sure we don't need the type here. Java should be able to infer the type from the TopicPartitionInfo argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org