This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f775be0  MINOR: Handle Metadata v0 all topics requests during parsing 
(#6300)
f775be0 is described below

commit f775be0514dc6a512245fd244e06e0aa11cc45c4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Feb 21 13:11:36 2019 -0800

    MINOR: Handle Metadata v0 all topics requests during parsing (#6300)
    
    Use of `MetadataRequest.isAllTopics` is not consistently defined for all 
versions of the api. For v0, it evaluates to false. This patch makes the 
behavior consistent for all versions.
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../kafka/common/requests/MetadataRequest.java     | 10 ++--
 .../kafka/common/requests/MetadataRequestTest.java | 55 ++++++++++++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 17 ++-----
 3 files changed, 66 insertions(+), 16 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 89a6e69..3f12f1d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -146,9 +146,13 @@ public class MetadataRequest extends AbstractRequest {
         super(ApiKeys.METADATA, version);
         Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
         if (topicArray != null) {
-            topics = new ArrayList<>();
-            for (Object topicObj: topicArray) {
-                topics.add((String) topicObj);
+            if (topicArray.length == 0 && version == 0) {
+                topics = null;
+            } else {
+                topics = new ArrayList<>();
+                for (Object topicObj: topicArray) {
+                    topics.add((String) topicObj);
+                }
             }
         } else {
             topics = null;
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
new file mode 100644
index 0000000..207cac7
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class MetadataRequestTest {
+
+    @Test
+    public void testEmptyMeansAllTopicsV0() {
+        Struct rawRequest = new Struct(MetadataRequest.schemaVersions()[0]);
+        rawRequest.set("topics", new Object[0]);
+        MetadataRequest parsedRequest = new MetadataRequest(rawRequest, 
(short) 0);
+        assertTrue(parsedRequest.isAllTopics());
+        assertNull(parsedRequest.topics());
+    }
+
+    @Test
+    public void testEmptyMeansEmptyForVersionsAboveV0() {
+        for (int i = 1; i < MetadataRequest.schemaVersions().length; i++) {
+            Schema schema = MetadataRequest.schemaVersions()[i];
+            Struct rawRequest = new Struct(schema);
+            rawRequest.set("topics", new Object[0]);
+            if (rawRequest.hasField("allow_auto_topic_creation"))
+                rawRequest.set("allow_auto_topic_creation", true);
+            MetadataRequest parsedRequest = new MetadataRequest(rawRequest, 
(short) i);
+            assertFalse(parsedRequest.isAllTopics());
+            assertEquals(Collections.emptyList(), parsedRequest.topics());
+        }
+    }
+
+}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d1b5bb8..c37edf6 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -992,19 +992,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val metadataRequest = request.body[MetadataRequest]
     val requestVersion = request.header.apiVersion
 
-    val topics =
-      // Handle old metadata request logic. Version 0 has no way to specify 
"no topics".
-      if (requestVersion == 0) {
-        if (metadataRequest.topics() == null || metadataRequest.topics.isEmpty)
-          metadataCache.getAllTopics()
-        else
-          metadataRequest.topics.asScala.toSet
-      } else {
-        if (metadataRequest.isAllTopics)
-          metadataCache.getAllTopics()
-        else
-          metadataRequest.topics.asScala.toSet
-      }
+    val topics = if (metadataRequest.isAllTopics)
+      metadataCache.getAllTopics()
+    else
+      metadataRequest.topics.asScala.toSet
 
     var (authorizedTopics, unauthorizedForDescribeTopics) =
       topics.partition(topic => authorize(request.session, Describe, 
Resource(Topic, topic, LITERAL)))

Reply via email to