This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f775be0 MINOR: Handle Metadata v0 all topics requests during parsing (#6300) f775be0 is described below commit f775be0514dc6a512245fd244e06e0aa11cc45c4 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Thu Feb 21 13:11:36 2019 -0800 MINOR: Handle Metadata v0 all topics requests during parsing (#6300) Use of `MetadataRequest.isAllTopics` is not consistently defined for all versions of the api. For v0, it evaluates to false. This patch makes the behavior consistent for all versions. Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> --- .../kafka/common/requests/MetadataRequest.java | 10 ++-- .../kafka/common/requests/MetadataRequestTest.java | 55 ++++++++++++++++++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 17 ++----- 3 files changed, 66 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 89a6e69..3f12f1d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -146,9 +146,13 @@ public class MetadataRequest extends AbstractRequest { super(ApiKeys.METADATA, version); Object[] topicArray = struct.getArray(TOPICS_KEY_NAME); if (topicArray != null) { - topics = new ArrayList<>(); - for (Object topicObj: topicArray) { - topics.add((String) topicObj); + if (topicArray.length == 0 && version == 0) { + topics = null; + } else { + topics = new ArrayList<>(); + for (Object topicObj: topicArray) { + topics.add((String) topicObj); + } } } else { topics = null; diff --git a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java new file mode 100644 index 0000000..207cac7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class MetadataRequestTest { + + @Test + public void testEmptyMeansAllTopicsV0() { + Struct rawRequest = new Struct(MetadataRequest.schemaVersions()[0]); + rawRequest.set("topics", new Object[0]); + MetadataRequest parsedRequest = new MetadataRequest(rawRequest, (short) 0); + assertTrue(parsedRequest.isAllTopics()); + assertNull(parsedRequest.topics()); + } + + @Test + public void testEmptyMeansEmptyForVersionsAboveV0() { + for (int i = 1; i < MetadataRequest.schemaVersions().length; i++) { + Schema schema = MetadataRequest.schemaVersions()[i]; + Struct rawRequest = new Struct(schema); + rawRequest.set("topics", new Object[0]); + if (rawRequest.hasField("allow_auto_topic_creation")) + rawRequest.set("allow_auto_topic_creation", true); + MetadataRequest parsedRequest = new MetadataRequest(rawRequest, (short) i); + assertFalse(parsedRequest.isAllTopics()); + assertEquals(Collections.emptyList(), parsedRequest.topics()); + } + } + +} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d1b5bb8..c37edf6 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -992,19 +992,10 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataRequest = request.body[MetadataRequest] val requestVersion = request.header.apiVersion - val topics = - // Handle old metadata request logic. Version 0 has no way to specify "no topics". - if (requestVersion == 0) { - if (metadataRequest.topics() == null || metadataRequest.topics.isEmpty) - metadataCache.getAllTopics() - else - metadataRequest.topics.asScala.toSet - } else { - if (metadataRequest.isAllTopics) - metadataCache.getAllTopics() - else - metadataRequest.topics.asScala.toSet - } + val topics = if (metadataRequest.isAllTopics) + metadataCache.getAllTopics() + else + metadataRequest.topics.asScala.toSet var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(topic => authorize(request.session, Describe, Resource(Topic, topic, LITERAL)))