ijuma commented on a change in pull request #10584:
URL: https://github.com/apache/kafka/pull/10584#discussion_r649189502



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1135,6 +1135,19 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Topic IDs are not supported for versions 10 and 11. Topic names can not 
be null in these versions.
+    if (metadataRequest.version() >= 10 && !metadataRequest.isAllTopics) {

Review comment:
       Do we need to check the version at all?

##########
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##########
@@ -234,6 +235,32 @@ class MetadataRequestTest extends 
AbstractMetadataRequestTest {
     }
   }
 
+  @Test
+  def testInvalidMetadataRequestReturnsError(): Unit = {

Review comment:
       Do we need a full blown slow request test for this or can we rely on 
unit tests only?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
##########
@@ -92,6 +93,15 @@ public MetadataRequest build(short version) {
             if (!data.allowAutoTopicCreation() && version < 4)
                 throw new UnsupportedVersionException("MetadataRequest 
versions older than 4 don't support the " +
                         "allowAutoTopicCreation field");
+            if (version >= 10) {

Review comment:
       Is this version check needed at all?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1142,6 +1142,19 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
+    // Topic IDs are not supported for versions 10 and 11. Topic names can not 
be null in these versions.
+    if (metadataRequest.version() >= 10 && !metadataRequest.isAllTopics) {
+      metadataRequest.data().topics().forEach{ topic =>
+        // If null, set to the empty string, since the response does not allow 
null.
+        if (topic.name() == null) {
+          topic.setName("")
+          throw new InvalidRequestException(s"Topic name can not be null for 
version ${metadataRequest.version()}")
+        } else if (topic.topicId() != Uuid.ZERO_UUID) {
+          throw new InvalidRequestException(s"Topic IDs are not supported in 
requests for version ${metadataRequest.version()}")
+        }
+      }
+    }

Review comment:
       What I mean is that this logic could exist in the request class and you 
can then call the method from here. That way it's much easier to test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to