artemlivshits commented on code in PR #14612:
URL: https://github.com/apache/kafka/pull/14612#discussion_r1449897783


##########
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+    MetadataCache metadataCache;
+    AuthHelper authHelper;
+    KafkaConfig config;
+
+    public DescribeTopicPartitionsRequestHandler(
+        MetadataCache metadataCache,
+        AuthHelper authHelper,
+        KafkaConfig config
+    ) {
+        this.metadataCache = metadataCache;
+        this.authHelper = authHelper;
+        this.config = config;
+    }
+
+    public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+        if (metadataCache instanceof ZkMetadataCache) {
+            throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+        }
+        KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+        DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+        Set<String> topics = new HashSet<>();
+        boolean fetchAllTopics = request.topics().isEmpty();
+        DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+        if (fetchAllTopics) {
+            if (cursor != null) {
+                // Includes the cursor topic in case the cursor topic does not 
exist anymore.
+                topics.add(cursor.topicName());

Review Comment:
   Instead of doing this, can we get all topics that are >= cursor and then 
check (after sorting) if the cursor topic is present and reset partition index 
to 0 if it's not?  This would also handle the unauthorized case.



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -189,6 +255,86 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  /**
+   * Get the topic metadata for the given topics.
+   *
+   * The quota is used to limit the number of partitions to return. The 
NextTopicPartition field points to the first
+   * partition can't be returned due the limit.
+   * If a topic can't return any partition due to quota limit reached, this 
topic will not be included in the response.
+   *
+   * Note, the topics should be sorted in alphabetical order. The topics in 
the DescribeTopicPartitionsResponseData
+   * will also be sorted in alphabetical order.
+   *
+   * @param topics                        The set of topics and their 
corresponding first partition id to fetch.
+   * @param listenerName                  The listener name.
+   * @param firstTopicPartitionStartIndex The start partition index for the 
first topic
+   * @param maximumNumberOfPartitions     The max number of partitions to 
return.
+   */
+  def getTopicMetadataForDescribeTopicResponse(
+    topics: Seq[String],
+    listenerName: ListenerName,
+    firstTopicPartitionStartIndex: Int,
+    maximumNumberOfPartitions: Int
+  ): DescribeTopicPartitionsResponseData = {
+    val image = _currentImage
+    var remaining = maximumNumberOfPartitions
+    var startIndex = firstTopicPartitionStartIndex
+    val result = new DescribeTopicPartitionsResponseData()
+    topics.foreach { topicName =>
+      if (remaining > 0) {
+        val partitionResponse = 
getPartitionMetadataForDescribeTopicResponse(image, topicName, listenerName)
+        partitionResponse.map( partitions => {
+          val upperIndex = startIndex + remaining
+          val response = new DescribeTopicPartitionsResponseTopic()
+            .setErrorCode(Errors.NONE.code)
+            .setName(topicName)
+            
.setTopicId(Option(image.topics().getTopic(topicName).id()).getOrElse(Uuid.ZERO_UUID))
+            .setIsInternal(Topic.isInternal(topicName))
+            .setPartitions(partitions.filter(partition => {
+              partition.partitionIndex() >= startIndex && 
partition.partitionIndex() < upperIndex
+            }).asJava)
+          remaining -= response.partitions().size()
+          result.topics().add(response)
+
+          if (upperIndex < partitions.size) {
+            result.setNextCursor(new Cursor()
+              .setTopicName(topicName)
+              .setPartitionIndex(upperIndex)
+            )
+            remaining = -1
+          }
+        })
+
+        // start index only applies to the first topic. Reset it here.
+        startIndex = 0
+
+        if (!partitionResponse.isDefined) {
+          val error = try {
+            Topic.validate(topicName)
+            Errors.UNKNOWN_TOPIC_OR_PARTITION

Review Comment:
   Yeah, but the error is kind of unexpected -- if the user didn't specify a 
topic in the first place, why would it get an error about a topic that doesn't 
exist?



##########
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+    MetadataCache metadataCache;
+    AuthHelper authHelper;
+    KafkaConfig config;
+
+    public DescribeTopicPartitionsRequestHandler(
+        MetadataCache metadataCache,
+        AuthHelper authHelper,
+        KafkaConfig config
+    ) {
+        this.metadataCache = metadataCache;
+        this.authHelper = authHelper;
+        this.config = config;
+    }
+
+    public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+        if (metadataCache instanceof ZkMetadataCache) {
+            throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+        }
+        KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+        DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+        Set<String> topics = new HashSet<>();
+        boolean fetchAllTopics = request.topics().isEmpty();
+        DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+        if (fetchAllTopics) {
+            if (cursor != null) {
+                // Includes the cursor topic in case the cursor topic does not 
exist anymore.
+                topics.add(cursor.topicName());
+                
kRaftMetadataCache.getAllTopicsAfterTopic(cursor.topicName()).foreach(topic -> 
topics.add(topic));

Review Comment:
   Why not just filter from all topics here?  getAllTopicsAfterTopic creates a 
new set just to copy it here.



##########
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##########
@@ -141,17 +144,33 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     }
   }
 
+  /**
+   * Return topic partition metadata for the given topic, listener and index 
range. Also, return a boolean value to
+   * indicate whether there are more partitions with index equal or larger 
than the upper index.
+   *
+   * @param image                       The metadata image
+   * @param topicName                   The name of the topic.
+   * @param listenerName                The listener name.
+   * @param startIndex                  The smallest index of the partitions 
to be included in the result.
+   * @param upperIndex                  The upper limit of the index of the 
partitions to be included in the result.
+   *                                    Note that, the upper index can be 
larger than the largest partition index in
+   *                                    this topic.
+   * @return                            A collection of topic partition 
metadata and whether there are more partitions.
+   */
   private def getPartitionMetadataForDescribeTopicResponse(
     image: MetadataImage,
     topicName: String,
-    listenerName: ListenerName
-  ): Option[List[DescribeTopicPartitionsResponsePartition]] = {
+    listenerName: ListenerName,
+    startIndex: Int,
+    upperIndex: Int
+  ): (Option[List[DescribeTopicPartitionsResponsePartition]], Boolean) = {
     Option(image.topics().getTopic(topicName)) match {
-      case None => None
+      case None => (None, false)
       case Some(topic) => {
-        val partitions = Some(topic.partitions().entrySet().asScala.map { 
entry =>
-          val partitionId = entry.getKey
-          val partition = entry.getValue
+        val result = new ListBuffer[DescribeTopicPartitionsResponsePartition]()
+        val endIndex = upperIndex.min(topic.partitions().size())
+        for (partitionId <- startIndex until endIndex) {
+          val partition = topic.partitions().get(partitionId)

Review Comment:
   What if partition doesn't exist?



##########
core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.handlers;
+
+import kafka.network.RequestChannel;
+import kafka.server.AuthHelper;
+import kafka.server.KafkaConfig;
+import kafka.server.MetadataCache;
+import kafka.server.metadata.KRaftMetadataCache;
+import kafka.server.metadata.ZkMetadataCache;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.message.DescribeTopicPartitionsRequestData;
+import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition;
+import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponseTopic;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeTopicPartitionsRequest;
+import org.apache.kafka.common.resource.Resource;
+import scala.collection.JavaConverters;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.resource.ResourceType.TOPIC;
+
+public class DescribeTopicPartitionsRequestHandler {
+    MetadataCache metadataCache;
+    AuthHelper authHelper;
+    KafkaConfig config;
+
+    public DescribeTopicPartitionsRequestHandler(
+        MetadataCache metadataCache,
+        AuthHelper authHelper,
+        KafkaConfig config
+    ) {
+        this.metadataCache = metadataCache;
+        this.authHelper = authHelper;
+        this.config = config;
+    }
+
+    public DescribeTopicPartitionsResponseData 
handleDescribeTopicPartitionsRequest(RequestChannel.Request abstractRequest) {
+        if (metadataCache instanceof ZkMetadataCache) {
+            throw new InvalidRequestException("ZK cluster does not handle 
DescribeTopicPartitions request");
+        }
+        KRaftMetadataCache kRaftMetadataCache = (KRaftMetadataCache) 
metadataCache;
+
+        DescribeTopicPartitionsRequestData request = 
((DescribeTopicPartitionsRequest) abstractRequest.loggableRequest()).data();
+        Set<String> topics = new HashSet<>();
+        boolean fetchAllTopics = request.topics().isEmpty();
+        DescribeTopicPartitionsRequestData.Cursor cursor = request.cursor();
+        if (fetchAllTopics) {
+            if (cursor != null) {
+                // Includes the cursor topic in case the cursor topic does not 
exist anymore.
+                topics.add(cursor.topicName());
+                
kRaftMetadataCache.getAllTopicsAfterTopic(cursor.topicName()).foreach(topic -> 
topics.add(topic));
+            } else {
+                kRaftMetadataCache.getAllTopics().foreach(topic -> 
topics.add(topic));
+            }
+        } else {
+            request.topics().forEach(topic -> {
+                String topicName = topic.name();
+                if (cursor == null || topicName.compareTo(cursor.topicName()) 
>= 0) {
+                    topics.add(topic.name());
+                }
+            });
+
+            if (cursor != null && !topics.contains(cursor.topicName())) {
+                // The topic in cursor must be included in the topic list if 
provided.
+                throw new 
InvalidRequestException("DescribeTopicPartitionsRequest topic list should 
contain the cursor topic: " + cursor.topicName());
+            }
+        }
+
+        // Do not disclose the existence of topics unauthorized for Describe, 
so we've not even checked if they exist or not
+        Set<DescribeTopicPartitionsResponseTopic> 
unauthorizedForDescribeTopicMetadata = new HashSet<>();
+
+        Iterator<String> authorizedTopics = topics.stream().filter(topicName 
-> {
+            boolean isAuthorized = authHelper.authorize(
+                abstractRequest.context(), DESCRIBE, TOPIC, topicName, true, 
true, 1);
+            if (!fetchAllTopics && !isAuthorized) {
+                // We should not return topicId when on unauthorized error, so 
we return zero uuid.
+                
unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic(
+                    Errors.TOPIC_AUTHORIZATION_FAILED, topicName, 
Uuid.ZERO_UUID, false, Collections.emptyList())
+                );
+            }
+            return isAuthorized;
+        }).sorted().iterator();
+
+        int firstPartitionId = cursor == null ? 0 : cursor.partitionIndex();

Review Comment:
   If cursor topic is not authorized, we'd skip partitions in the next 
authorized topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to