Repository: kafka Updated Branches: refs/heads/trunk ca7f3dab0 -> cdcbd9283
KAFKA-6210; IllegalArgumentException if 1.0.0 is used for inter.broker.protocol.version or log.message.format.version Added unit test for ApiVersion and testApiVersions from Scala to Java. Author: Ismael Juma <[email protected]> Reviewers: Rajini Sivaram <[email protected]> Closes #4220 from ijuma/kafka-6210-iae-if-1.0.0-inter-broker-protocol-version Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cdcbd928 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cdcbd928 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cdcbd928 Branch: refs/heads/trunk Commit: cdcbd9283ddea0207dad7a9f534a534a463f01f4 Parents: ca7f3da Author: Ismael Juma <[email protected]> Authored: Thu Nov 16 01:21:11 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Thu Nov 16 01:21:11 2017 +0000 ---------------------------------------------------------------------- .../requests/ApiVersionsResponseTest.java | 36 ++++++++- core/src/main/scala/kafka/api/ApiVersion.scala | 9 ++- .../scala/unit/kafka/api/ApiVersionTest.scala | 77 ++++++++++++++++++++ .../unit/kafka/server/ApiVersionsTest.scala | 51 ------------- 4 files changed, 115 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cdcbd928/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 6d488cd..2b526d1 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -22,33 +22,61 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.utils.Utils; import org.junit.Test; +import java.util.Collection; import java.util.HashSet; import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class ApiVersionsResponseTest { @Test - public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() throws Exception { + public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() { final ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(10, RecordBatch.MAGIC_VALUE_V1); verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); assertEquals(10, response.throttleTimeMs()); } @Test - public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() throws Exception { + public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() { assertEquals(apiKeysInResponse(ApiVersionsResponse.defaultApiVersionsResponse()), Utils.mkSet(ApiKeys.values())); } @Test - public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() throws Exception { + public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() { ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE); assertEquals(Utils.mkSet(ApiKeys.values()), apiKeysInResponse(response)); assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); } + + @Test + public void shouldHaveCorrectDefaultApiVersionsResponse() { + Collection<ApiVersionsResponse.ApiVersion> apiVersions = ApiVersionsResponse.defaultApiVersionsResponse().apiVersions(); + assertEquals("API versions for all API keys must be maintained.", apiVersions.size(), ApiKeys.values().length); + + for (ApiKeys key : ApiKeys.values()) { + ApiVersionsResponse.ApiVersion version = ApiVersionsResponse.defaultApiVersionsResponse().apiVersion(key.id); + assertNotNull("Could not find ApiVersion for API " + key.name, version); + assertEquals("Incorrect min version for Api " + key.name, version.minVersion, key.oldestVersion()); + assertEquals("Incorrect max version for Api " + key.name, version.maxVersion, key.latestVersion()); + + // Check if versions less than min version are indeed set as null, i.e., deprecated. + for (int i = 0; i < version.minVersion; ++i) { + assertNull("Request version " + i + " for API " + version.apiKey + " must be null", key.requestSchemas[i]); + assertNull("Response version " + i + " for API " + version.apiKey + " must be null", key.responseSchemas[i]); + } + + // Check if versions between min and max versions are non null, i.e., valid. + for (int i = version.minVersion; i <= version.maxVersion; ++i) { + assertNotNull("Request version " + i + " for API " + version.apiKey + " must not be null", key.requestSchemas[i]); + assertNotNull("Response version " + i + " for API " + version.apiKey + " must not be null", key.responseSchemas[i]); + } + } + } private void verifyApiKeysForMagic(final ApiVersionsResponse response, final byte maxMagic) { for (final ApiVersionsResponse.ApiVersion version : response.apiVersions()) { @@ -65,4 +93,4 @@ public class ApiVersionsResponseTest { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/cdcbd928/core/src/main/scala/kafka/api/ApiVersion.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala index a72d2e0..e509fc5 100644 --- a/core/src/main/scala/kafka/api/ApiVersion.scala +++ b/core/src/main/scala/kafka/api/ApiVersion.scala @@ -77,9 +77,12 @@ object ApiVersion { private val versionPattern = "\\.".r - def apply(version: String): ApiVersion = - versionNameMap.getOrElse(versionPattern.split(version).slice(0, 3).mkString("."), - throw new IllegalArgumentException(s"Version `$version` is not a valid version")) + def apply(version: String): ApiVersion = { + val versionsSeq = versionPattern.split(version) + val numSegments = if (version.startsWith("0.")) 3 else 2 + val key = versionsSeq.take(numSegments).mkString(".") + versionNameMap.getOrElse(key, throw new IllegalArgumentException(s"Version `$version` is not a valid version")) + } def latestVersion = versionNameMap.values.max http://git-wip-us.apache.org/repos/asf/kafka/blob/cdcbd928/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala new file mode 100644 index 0000000..6fc6974 --- /dev/null +++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import org.junit.Test +import org.junit.Assert._ + +class ApiVersionTest { + + @Test + def testApply(): Unit = { + assertEquals(KAFKA_0_8_0, ApiVersion("0.8.0")) + assertEquals(KAFKA_0_8_0, ApiVersion("0.8.0.0")) + assertEquals(KAFKA_0_8_0, ApiVersion("0.8.0.1")) + + assertEquals(KAFKA_0_8_1, ApiVersion("0.8.1")) + assertEquals(KAFKA_0_8_1, ApiVersion("0.8.1.0")) + assertEquals(KAFKA_0_8_1, ApiVersion("0.8.1.1")) + + assertEquals(KAFKA_0_8_2, ApiVersion("0.8.2")) + assertEquals(KAFKA_0_8_2, ApiVersion("0.8.2.0")) + assertEquals(KAFKA_0_8_2, ApiVersion("0.8.2.1")) + + assertEquals(KAFKA_0_9_0, ApiVersion("0.9.0")) + assertEquals(KAFKA_0_9_0, ApiVersion("0.9.0.0")) + assertEquals(KAFKA_0_9_0, ApiVersion("0.9.0.1")) + + assertEquals(KAFKA_0_10_0_IV0, ApiVersion("0.10.0-IV0")) + + assertEquals(KAFKA_0_10_0_IV1, ApiVersion("0.10.0")) + assertEquals(KAFKA_0_10_0_IV1, ApiVersion("0.10.0.0")) + assertEquals(KAFKA_0_10_0_IV1, ApiVersion("0.10.0.0-IV0")) + assertEquals(KAFKA_0_10_0_IV1, ApiVersion("0.10.0.1")) + + assertEquals(KAFKA_0_10_1_IV0, ApiVersion("0.10.1-IV0")) + assertEquals(KAFKA_0_10_1_IV1, ApiVersion("0.10.1-IV1")) + + assertEquals(KAFKA_0_10_1_IV2, ApiVersion("0.10.1")) + assertEquals(KAFKA_0_10_1_IV2, ApiVersion("0.10.1.0")) + assertEquals(KAFKA_0_10_1_IV2, ApiVersion("0.10.1-IV2")) + assertEquals(KAFKA_0_10_1_IV2, ApiVersion("0.10.1.1")) + + assertEquals(KAFKA_0_10_2_IV0, ApiVersion("0.10.2")) + assertEquals(KAFKA_0_10_2_IV0, ApiVersion("0.10.2.0")) + assertEquals(KAFKA_0_10_2_IV0, ApiVersion("0.10.2-IV0")) + assertEquals(KAFKA_0_10_2_IV0, ApiVersion("0.10.2.1")) + + assertEquals(KAFKA_0_11_0_IV0, ApiVersion("0.11.0-IV0")) + assertEquals(KAFKA_0_11_0_IV1, ApiVersion("0.11.0-IV1")) + + assertEquals(KAFKA_0_11_0_IV2, ApiVersion("0.11.0")) + assertEquals(KAFKA_0_11_0_IV2, ApiVersion("0.11.0.0")) + assertEquals(KAFKA_0_11_0_IV2, ApiVersion("0.11.0-IV2")) + assertEquals(KAFKA_0_11_0_IV2, ApiVersion("0.11.0.1")) + + assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0")) + assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.0")) + assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.0-IV0")) + assertEquals(KAFKA_1_0_IV0, ApiVersion("1.0.1")) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/cdcbd928/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala deleted file mode 100644 index 3ba7cd5..0000000 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import org.apache.kafka.common.requests.ApiVersionsResponse -import org.apache.kafka.common.protocol.ApiKeys -import org.junit.Assert._ -import org.junit.Test - -class ApiVersionsTest { - - @Test - def testApiVersions(): Unit = { - val apiVersions = ApiVersionsResponse.defaultApiVersionsResponse().apiVersions - assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length) - - for (key <- ApiKeys.values) { - val version = ApiVersionsResponse.defaultApiVersionsResponse().apiVersion(key.id) - assertNotNull(s"Could not find ApiVersion for API ${key.name}", version) - assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, key.oldestVersion) - assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, key.latestVersion) - - // Check if versions less than min version are indeed set as null, i.e., deprecated. - for (i <- 0 until version.minVersion) { - assertNull(s"Request version $i for API ${version.apiKey} must be null.", key.requestSchemas(i)) - assertNull(s"Response version $i for API ${version.apiKey} must be null.", key.responseSchemas(i)) - } - - // Check if versions between min and max versions are non null, i.e., valid. - for (i <- version.minVersion.toInt to version.maxVersion) { - assertNotNull(s"Request version $i for API ${version.apiKey} must not be null.", key.requestSchemas(i)) - assertNotNull(s"Response version $i for API ${version.apiKey} must not be null.", key.responseSchemas(i)) - } - } - } -}
