KAFKA-4501; Java 9 compilation and runtime fixes Compilation error fixes: - Avoid ambiguity error when appending to Properties in Scala code (https://github.com/scala/bug/issues/10418) - Use position() and limit() to fix ambiguity issue ( https://github.com/scala/bug/issues/10418#issuecomment-316364778) - Disable findBugs if Java 9 is used ( https://github.com/findbugsproject/findbugs/issues/105)
Compilation warning fixes: - Avoid deprecated Class.newInstance in Utils.newInstance - Silence a few Java 9 deprecation warnings - var -> val and unused fixes Runtime error fixes: - Introduce Base64 class that works in Java 7 and Java 9 Also: - Set --release option if building with Java 9 Note that tests involving EasyMock (https://github.com/easymock/easymock/issues/193) or PowerMock (https://github.com/powermock/powermock/issues/783) will fail as neither supports Java 9 currently. Author: Ismael Juma <ism...@juma.me.uk> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3647 from ijuma/kafka-4501-support-java-9 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed96523a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed96523a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed96523a Branch: refs/heads/trunk Commit: ed96523a2c763b48399a9720dcce0af44f5fc1a1 Parents: 3e22c1c Author: Ismael Juma <ism...@juma.me.uk> Authored: Sat Aug 19 08:55:29 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Sat Aug 19 08:55:29 2017 +0100 ---------------------------------------------------------------------- build.gradle | 38 ++- checkstyle/import-control.xml | 2 - .../security/plain/PlainSaslServerProvider.java | 3 +- .../security/scram/ScramCredentialUtils.java | 15 +- .../common/security/scram/ScramMessages.java | 19 +- .../security/scram/ScramSaslClientProvider.java | 3 +- .../security/scram/ScramSaslServerProvider.java | 3 +- .../org/apache/kafka/common/utils/Base64.java | 261 +++++++++++++++++++ .../org/apache/kafka/common/utils/Crc32C.java | 4 +- .../org/apache/kafka/common/utils/Java.java | 3 + .../org/apache/kafka/common/utils/Utils.java | 12 +- .../authenticator/TestDigestLoginModule.java | 3 +- .../security/scram/ScramFormatterTest.java | 9 +- .../security/scram/ScramMessagesTest.java | 14 +- .../java/org/apache/kafka/test/TestUtils.java | 4 +- core/src/main/scala/kafka/Kafka.scala | 3 +- .../main/scala/kafka/admin/ConfigCommand.scala | 3 +- .../kafka/admin/ConsumerGroupCommand.scala | 4 +- .../main/scala/kafka/admin/TopicCommand.scala | 3 +- .../scala/kafka/admin/ZkSecurityMigrator.scala | 2 +- .../main/scala/kafka/api/FetchResponse.scala | 2 +- .../main/scala/kafka/api/ProducerRequest.scala | 2 +- .../kafka/controller/KafkaController.scala | 2 +- .../scala/kafka/javaapi/TopicMetadata.scala | 1 - .../consumer/ZookeeperConsumerConnector.scala | 2 +- .../main/scala/kafka/log/AbstractIndex.scala | 12 +- core/src/main/scala/kafka/log/LogCleaner.scala | 6 +- core/src/main/scala/kafka/log/LogConfig.scala | 5 +- core/src/main/scala/kafka/log/OffsetIndex.scala | 4 +- core/src/main/scala/kafka/log/OffsetMap.scala | 2 +- .../scala/kafka/log/ProducerStateManager.scala | 4 +- core/src/main/scala/kafka/log/TimeIndex.scala | 2 +- .../kafka/message/ByteBufferMessageSet.scala | 2 +- core/src/main/scala/kafka/message/Message.scala | 4 +- .../scala/kafka/producer/ProducerPool.scala | 3 +- .../scala/kafka/producer/SyncProducer.scala | 2 +- .../scala/kafka/security/auth/Operation.scala | 2 - .../kafka/security/auth/PermissionType.scala | 2 - .../security/auth/SimpleAclAuthorizer.scala | 1 - .../main/scala/kafka/server/ConfigHandler.scala | 4 +- .../scala/kafka/server/DelayedOperation.scala | 2 +- .../kafka/server/DelayedOperationKey.scala | 1 - .../main/scala/kafka/server/KafkaConfig.scala | 5 +- .../scala/kafka/tools/ConsoleConsumer.scala | 9 +- .../scala/kafka/tools/ConsoleProducer.scala | 5 +- .../scala/kafka/tools/DumpLogSegments.scala | 4 +- .../scala/kafka/tools/ExportZkOffsets.scala | 2 +- .../main/scala/kafka/tools/GetOffsetShell.scala | 4 +- .../scala/kafka/tools/ImportZkOffsets.scala | 1 - core/src/main/scala/kafka/tools/JmxTool.scala | 10 +- .../scala/kafka/tools/ProducerPerformance.scala | 5 +- .../scala/kafka/tools/ReplayLogProducer.scala | 2 +- core/src/main/scala/kafka/utils/CoreUtils.scala | 13 +- core/src/main/scala/kafka/utils/Implicits.scala | 49 ++++ .../scala/kafka/utils/ReplicationUtils.scala | 1 - .../src/main/scala/kafka/utils/ToolsUtils.scala | 9 +- .../kafka/api/AdminClientIntegrationTest.scala | 8 +- .../kafka/api/BaseConsumerTest.scala | 2 +- .../kafka/api/EndToEndClusterIdTest.scala | 3 +- .../kafka/api/IntegrationTestHarness.scala | 7 +- .../kafka/api/ProducerBounceTest.scala | 3 +- .../api/SaslEndToEndAuthorizationTest.scala | 3 +- .../api/SaslSslAdminClientIntegrationTest.scala | 2 +- ...tenersWithSameSecurityProtocolBaseTest.scala | 3 +- .../other/kafka/ReplicationQuotasTestRig.scala | 3 +- .../other/kafka/TestLinearWriteSpeed.scala | 4 +- .../unit/kafka/admin/ConfigCommandTest.scala | 2 +- .../ZookeeperConsumerConnectorTest.scala | 3 +- .../TransactionMarkerChannelManagerTest.scala | 6 +- .../log/AbstractLogCleanerIntegrationTest.scala | 3 +- .../message/ByteBufferMessageSetTest.scala | 4 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 29 ++- .../test/scala/unit/kafka/utils/UtilsTest.scala | 8 +- 73 files changed, 505 insertions(+), 182 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 48f3f2f..f9862c6 100644 --- a/build.gradle +++ b/build.gradle @@ -82,7 +82,7 @@ ext { maxPermSizeArgs = [] if (!JavaVersion.current().isJava8Compatible()) - maxPermSizeArgs = ['-XX:MaxPermSize=512m'] + maxPermSizeArgs += '-XX:MaxPermSize=512m' userMaxForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : null @@ -137,14 +137,24 @@ subprojects { apply plugin: 'maven' apply plugin: 'signing' apply plugin: 'checkstyle' - apply plugin: 'findbugs' + + if (!JavaVersion.current().isJava9Compatible()) + apply plugin: 'findbugs' sourceCompatibility = 1.7 + targetCompatibility = 1.7 compileJava { options.encoding = 'UTF-8' - // Add unchecked once we drop support for Java 7 as @SuppressWarnings("unchecked") is too buggy in Java 7 options.compilerArgs << "-Xlint:deprecation" + // -Xlint:unchecked is too buggy in Java 7, so we only enable for Java 8 or higher + if (JavaVersion.current().isJava8Compatible()) + options.compilerArgs << "-Xlint:unchecked" + // --release is the recommended way to select the target release, but it's only supported in Java 9 so we also + // set --source and --target via `sourceCompatibility` and `targetCompatibility`. If/when Gradle supports `--release` + // natively (https://github.com/gradle/gradle/issues/2510), we should switch to that. + if (JavaVersion.current().isJava9Compatible()) + options.compilerArgs << "--release" << "7" } uploadArchives { @@ -349,17 +359,19 @@ subprojects { } test.dependsOn('checkstyleMain', 'checkstyleTest') - findbugs { - toolVersion = "3.0.1" - excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml") - ignoreFailures = false - } - test.dependsOn('findbugsMain') + if (!JavaVersion.current().isJava9Compatible()) { + findbugs { + toolVersion = "3.0.1" + excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml") + ignoreFailures = false + } + test.dependsOn('findbugsMain') - tasks.withType(FindBugs) { - reports { - xml.enabled (project.hasProperty('xmlFindBugsReport')) - html.enabled (!project.hasProperty('xmlFindBugsReport')) + tasks.withType(FindBugs) { + reports { + xml.enabled(project.hasProperty('xmlFindBugsReport')) + html.enabled(!project.hasProperty('xmlFindBugsReport')) + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 4bd907b..8c3e3ae 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -98,7 +98,6 @@ </subpackage> <subpackage name="scram"> <allow pkg="javax.crypto" /> - <allow pkg="javax.xml.bind" /> <allow pkg="org.apache.kafka.common.errors" /> </subpackage> </subpackage> @@ -247,7 +246,6 @@ <subpackage name="test"> <allow pkg="org.apache.kafka" /> <allow pkg="org.bouncycastle" /> - <allow pkg="javax.xml.bind" /> </subpackage> <subpackage name="connect"> http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java index 51998a9..ae14244 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java @@ -25,9 +25,10 @@ public class PlainSaslServerProvider extends Provider { private static final long serialVersionUID = 1L; + @SuppressWarnings("deprecation") protected PlainSaslServerProvider() { super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN Server Provider for Kafka"); - super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName()); + put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, PlainSaslServerFactory.class.getName()); } public static void initialize() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java index 8120c15..55b0651 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java @@ -19,9 +19,8 @@ package org.apache.kafka.common.security.scram; import java.util.Collection; import java.util.Properties; -import javax.xml.bind.DatatypeConverter; - import org.apache.kafka.common.security.authenticator.CredentialCache; +import org.apache.kafka.common.utils.Base64; /** * SCRAM Credential persistence utility functions. Implements format conversion used @@ -41,11 +40,11 @@ public class ScramCredentialUtils { public static String credentialToString(ScramCredential credential) { return String.format("%s=%s,%s=%s,%s=%s,%s=%d", SALT, - DatatypeConverter.printBase64Binary(credential.salt()), + Base64.encoder().encodeToString(credential.salt()), STORED_KEY, - DatatypeConverter.printBase64Binary(credential.storedKey()), + Base64.encoder().encodeToString(credential.storedKey()), SERVER_KEY, - DatatypeConverter.printBase64Binary(credential.serverKey()), + Base64.encoder().encodeToString(credential.serverKey()), ITERATIONS, credential.iterations()); } @@ -56,9 +55,9 @@ public class ScramCredentialUtils { !props.containsKey(SERVER_KEY) || !props.containsKey(ITERATIONS)) { throw new IllegalArgumentException("Credentials not valid: " + str); } - byte[] salt = DatatypeConverter.parseBase64Binary(props.getProperty(SALT)); - byte[] storedKey = DatatypeConverter.parseBase64Binary(props.getProperty(STORED_KEY)); - byte[] serverKey = DatatypeConverter.parseBase64Binary(props.getProperty(SERVER_KEY)); + byte[] salt = Base64.decoder().decode(props.getProperty(SALT)); + byte[] storedKey = Base64.decoder().decode(props.getProperty(STORED_KEY)); + byte[] serverKey = Base64.decoder().decode(props.getProperty(SERVER_KEY)); int iterations = Integer.parseInt(props.getProperty(ITERATIONS)); return new ScramCredential(salt, storedKey, serverKey, iterations); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java index 6fd117d..1ad7266 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java @@ -16,12 +16,13 @@ */ package org.apache.kafka.common.security.scram; +import org.apache.kafka.common.utils.Base64; + import java.nio.charset.StandardCharsets; import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.security.sasl.SaslException; -import javax.xml.bind.DatatypeConverter; /** * SCRAM request/response message creation and parsing based on @@ -140,7 +141,7 @@ public class ScramMessages { } this.nonce = matcher.group("nonce"); String salt = matcher.group("salt"); - this.salt = DatatypeConverter.parseBase64Binary(salt); + this.salt = Base64.decoder().decode(salt); } public ServerFirstMessage(String clientNonce, String serverNonce, byte[] salt, int iterations) { this.nonce = clientNonce + serverNonce; @@ -157,7 +158,7 @@ public class ScramMessages { return iterations; } String toMessage() { - return String.format("r=%s,s=%s,i=%d", nonce, DatatypeConverter.printBase64Binary(salt), iterations); + return String.format("r=%s,s=%s,i=%d", nonce, Base64.encoder().encodeToString(salt), iterations); } } /** @@ -184,9 +185,9 @@ public class ScramMessages { if (!matcher.matches()) throw new SaslException("Invalid SCRAM client final message format: " + message); - this.channelBinding = DatatypeConverter.parseBase64Binary(matcher.group("channel")); + this.channelBinding = Base64.decoder().decode(matcher.group("channel")); this.nonce = matcher.group("nonce"); - this.proof = DatatypeConverter.parseBase64Binary(matcher.group("proof")); + this.proof = Base64.decoder().decode(matcher.group("proof")); } public ClientFinalMessage(byte[] channelBinding, String nonce) { this.channelBinding = channelBinding; @@ -206,13 +207,13 @@ public class ScramMessages { } public String clientFinalMessageWithoutProof() { return String.format("c=%s,r=%s", - DatatypeConverter.printBase64Binary(channelBinding), + Base64.encoder().encodeToString(channelBinding), nonce); } String toMessage() { return String.format("%s,p=%s", clientFinalMessageWithoutProof(), - DatatypeConverter.printBase64Binary(proof)); + Base64.encoder().encodeToString(proof)); } } /** @@ -243,7 +244,7 @@ public class ScramMessages { // ignore } if (error == null) { - this.serverSignature = DatatypeConverter.parseBase64Binary(matcher.group("signature")); + this.serverSignature = Base64.decoder().decode(matcher.group("signature")); this.error = null; } else { this.serverSignature = null; @@ -264,7 +265,7 @@ public class ScramMessages { if (error != null) return "e=" + error; else - return "v=" + DatatypeConverter.printBase64Binary(serverSignature); + return "v=" + Base64.encoder().encodeToString(serverSignature); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java index fac673e..d389f04 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java @@ -25,10 +25,11 @@ public class ScramSaslClientProvider extends Provider { private static final long serialVersionUID = 1L; + @SuppressWarnings("deprecation") protected ScramSaslClientProvider() { super("SASL/SCRAM Client Provider", 1.0, "SASL/SCRAM Client Provider for Kafka"); for (ScramMechanism mechanism : ScramMechanism.values()) - super.put("SaslClientFactory." + mechanism.mechanismName(), ScramSaslClientFactory.class.getName()); + put("SaslClientFactory." + mechanism.mechanismName(), ScramSaslClientFactory.class.getName()); } public static void initialize() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java index 2f768a3..9f2a6b3 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java +++ b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java @@ -25,10 +25,11 @@ public class ScramSaslServerProvider extends Provider { private static final long serialVersionUID = 1L; + @SuppressWarnings("deprecation") protected ScramSaslServerProvider() { super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider for Kafka"); for (ScramMechanism mechanism : ScramMechanism.values()) - super.put("SaslServerFactory." + mechanism.mechanismName(), ScramSaslServerFactory.class.getName()); + put("SaslServerFactory." + mechanism.mechanismName(), ScramSaslServerFactory.class.getName()); } public static void initialize() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/utils/Base64.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Base64.java b/clients/src/main/java/org/apache/kafka/common/utils/Base64.java new file mode 100644 index 0000000..e06e1ee --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/Base64.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.utils; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; + +/** + * Temporary class in order to support Java 7 and Java 9. `DatatypeConverter` is not in the base module of Java 9 + * and `java.util.Base64` was only introduced in Java 8. + */ +public final class Base64 { + + private static final Factory FACTORY; + + static { + if (Java.IS_JAVA8_COMPATIBLE) + FACTORY = new Java8Factory(); + else + FACTORY = new Java7Factory(); + } + + private Base64() {} + + public static Encoder encoder() { + return FACTORY.encoder(); + } + + public static Encoder urlEncoderNoPadding() { + return FACTORY.urlEncoderNoPadding(); + } + + public static Decoder decoder() { + return FACTORY.decoder(); + } + + /* Contains a subset of methods from java.util.Base64.Encoder (introduced in Java 8) */ + public interface Encoder { + String encodeToString(byte[] bytes); + } + + /* Contains a subset of methods from java.util.Base64.Decoder (introduced in Java 8) */ + public interface Decoder { + byte[] decode(String string); + } + + private interface Factory { + Encoder urlEncoderNoPadding(); + Encoder encoder(); + Decoder decoder(); + } + + private static class Java8Factory implements Factory { + + // Static final MethodHandles are optimised better by HotSpot + private static final MethodHandle URL_ENCODE_NO_PADDING; + private static final MethodHandle ENCODE; + private static final MethodHandle DECODE; + + private static final Encoder URL_ENCODER_NO_PADDING; + private static final Encoder ENCODER; + private static final Decoder DECODER; + + static { + try { + Class<?> base64Class = Class.forName("java.util.Base64"); + + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + + Class<?> juEncoderClass = Class.forName("java.util.Base64$Encoder"); + + MethodHandle getEncoder = lookup.findStatic(base64Class, "getEncoder", + MethodType.methodType(juEncoderClass)); + Object juEncoder; + try { + juEncoder = getEncoder.invoke(); + } catch (Throwable throwable) { + // Invoked method doesn't throw checked exceptions, so safe to cast + throw (RuntimeException) throwable; + } + MethodHandle encode = lookup.findVirtual(juEncoderClass, "encodeToString", + MethodType.methodType(String.class, byte[].class)); + ENCODE = encode.bindTo(juEncoder); + + + MethodHandle getUrlEncoder = lookup.findStatic(base64Class, "getUrlEncoder", + MethodType.methodType(juEncoderClass)); + Object juUrlEncoderNoPassing; + try { + juUrlEncoderNoPassing = lookup.findVirtual(juEncoderClass, "withoutPadding", + MethodType.methodType(juEncoderClass)).invoke(getUrlEncoder.invoke()); + } catch (Throwable throwable) { + // Invoked method doesn't throw checked exceptions, so safe to cast + throw (RuntimeException) throwable; + } + URL_ENCODE_NO_PADDING = encode.bindTo(juUrlEncoderNoPassing); + + Class<?> juDecoderClass = Class.forName("java.util.Base64$Decoder"); + MethodHandle getDecoder = lookup.findStatic(base64Class, "getDecoder", + MethodType.methodType(juDecoderClass)); + MethodHandle decode = lookup.findVirtual(juDecoderClass, "decode", + MethodType.methodType(byte[].class, String.class)); + try { + DECODE = decode.bindTo(getDecoder.invoke()); + } catch (Throwable throwable) { + // Invoked method doesn't throw checked exceptions, so safe to cast + throw (RuntimeException) throwable; + } + + URL_ENCODER_NO_PADDING = new Encoder() { + @Override + public String encodeToString(byte[] bytes) { + try { + return (String) URL_ENCODE_NO_PADDING.invokeExact(bytes); + } catch (Throwable throwable) { + // Invoked method doesn't throw checked exceptions, so safe to cast + throw (RuntimeException) throwable; + } + } + }; + + ENCODER = new Encoder() { + @Override + public String encodeToString(byte[] bytes) { + try { + return (String) ENCODE.invokeExact(bytes); + } catch (Throwable throwable) { + // Invoked method doesn't throw checked exceptions, so safe to cast + throw (RuntimeException) throwable; + } + } + }; + + DECODER = new Decoder() { + @Override + public byte[] decode(String string) { + try { + return (byte[]) DECODE.invokeExact(string); + } catch (Throwable throwable) { + // Invoked method doesn't throw checked exceptions, so safe to cast + throw (RuntimeException) throwable; + } + } + }; + + } catch (ReflectiveOperationException e) { + // Should never happen + throw new RuntimeException(e); + } + } + + @Override + public Encoder urlEncoderNoPadding() { + return URL_ENCODER_NO_PADDING; + } + + @Override + public Encoder encoder() { + return ENCODER; + } + + @Override + public Decoder decoder() { + return DECODER; + } + } + + private static class Java7Factory implements Factory { + + // Static final MethodHandles are optimised better by HotSpot + private static final MethodHandle PRINT; + private static final MethodHandle PARSE; + + static { + try { + Class<?> cls = Class.forName("javax.xml.bind.DatatypeConverter"); + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + PRINT = lookup.findStatic(cls, "printBase64Binary", MethodType.methodType(String.class, + byte[].class)); + PARSE = lookup.findStatic(cls, "parseBase64Binary", MethodType.methodType(byte[].class, + String.class)); + } catch (ReflectiveOperationException e) { + // Should never happen + throw new RuntimeException(e); + } + } + + public static final Encoder URL_ENCODER_NO_PADDING = new Encoder() { + + @Override + public String encodeToString(byte[] bytes) { + String base64EncodedUUID = Java7Factory.encodeToString(bytes); + //Convert to URL safe variant by replacing + and / with - and _ respectively. + String urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", "-") + .replace("/", "_"); + // Remove the "==" padding at the end. + return urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length() - 2); + } + + }; + + public static final Encoder ENCODER = new Encoder() { + @Override + public String encodeToString(byte[] bytes) { + return Java7Factory.encodeToString(bytes); + } + }; + + public static final Decoder DECODER = new Decoder() { + @Override + public byte[] decode(String string) { + try { + return (byte[]) PARSE.invokeExact(string); + } catch (Throwable throwable) { + // Invoked method doesn't throw checked exceptions, so safe to cast + throw (RuntimeException) throwable; + } + } + }; + + private static String encodeToString(byte[] bytes) { + try { + return (String) PRINT.invokeExact(bytes); + } catch (Throwable throwable) { + // Invoked method doesn't throw checked exceptions, so safe to cast + throw (RuntimeException) throwable; + } + } + + @Override + public Encoder urlEncoderNoPadding() { + return URL_ENCODER_NO_PADDING; + } + + @Override + public Encoder encoder() { + return ENCODER; + } + + @Override + public Decoder decoder() { + return DECODER; + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java b/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java index b547beb..dfe22e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java @@ -32,7 +32,7 @@ import java.util.zip.Checksum; * * NOTE: This class is intended for INTERNAL usage only within Kafka. */ -public class Crc32C { +public final class Crc32C { private static final ChecksumFactory CHECKSUM_FACTORY; @@ -43,6 +43,8 @@ public class Crc32C { CHECKSUM_FACTORY = new PureJavaChecksumFactory(); } + private Crc32C() {} + /** * Compute the CRC32C (Castagnoli) of the segment of the byte array given by the specified size and offset * http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/utils/Java.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Java.java b/clients/src/main/java/org/apache/kafka/common/utils/Java.java index b374c24..38d9541 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Java.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Java.java @@ -41,6 +41,9 @@ public final class Java { public static final boolean IS_JAVA9_COMPATIBLE = JVM_MAJOR_VERSION > 1 || (JVM_MAJOR_VERSION == 1 && JVM_MINOR_VERSION >= 9); + public static final boolean IS_JAVA8_COMPATIBLE = JVM_MAJOR_VERSION > 1 || + (JVM_MAJOR_VERSION == 1 && JVM_MINOR_VERSION >= 8); + public static boolean isIBMJdk() { return System.getProperty("java.vendor").contains("IBM"); } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 75f8cf7..ee82f9a 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -284,14 +284,14 @@ public class Utils { * Instantiate the class */ public static <T> T newInstance(Class<T> c) { + if (c == null) + throw new KafkaException("class cannot be null"); try { - return c.newInstance(); - } catch (IllegalAccessException e) { + return c.getDeclaredConstructor().newInstance(); + } catch (NoSuchMethodException e) { + throw new KafkaException("Could not find a public no-argument constructor for " + c.getName(), e); + } catch (ReflectiveOperationException | RuntimeException e) { throw new KafkaException("Could not instantiate class " + c.getName(), e); - } catch (InstantiationException e) { - throw new KafkaException("Could not instantiate class " + c.getName() + " Does it have a public no-argument constructor?", e); - } catch (NullPointerException e) { - throw new KafkaException("Requested class was null", e); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java index 8f9bce5..f1ef740 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java @@ -105,9 +105,10 @@ public class TestDigestLoginModule extends PlainLoginModule { private static final long serialVersionUID = 1L; + @SuppressWarnings("deprecation") protected DigestSaslServerProvider() { super("Test SASL/Digest-MD5 Server Provider", 1.0, "Test SASL/Digest-MD5 Server Provider for Kafka"); - super.put("SaslServerFactory.DIGEST-MD5", TestDigestLoginModule.DigestSaslServerFactory.class.getName()); + put("SaslServerFactory.DIGEST-MD5", TestDigestLoginModule.DigestSaslServerFactory.class.getName()); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java index 371031b..a86e0dd 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java @@ -16,10 +16,9 @@ */ package org.apache.kafka.common.security.scram; +import org.apache.kafka.common.utils.Base64; import org.junit.Test; -import javax.xml.bind.DatatypeConverter; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -55,13 +54,13 @@ public class ScramFormatterTest { String serverNonce = serverFirst.nonce().substring(clientNonce.length()); assertEquals("%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0", serverNonce); byte[] salt = serverFirst.salt(); - assertArrayEquals(DatatypeConverter.parseBase64Binary("W22ZaJ0SNY7soEsUEjb6gQ=="), salt); + assertArrayEquals(Base64.decoder().decode("W22ZaJ0SNY7soEsUEjb6gQ=="), salt); int iterations = serverFirst.iterations(); assertEquals(4096, iterations); byte[] channelBinding = clientFinal.channelBinding(); - assertArrayEquals(DatatypeConverter.parseBase64Binary("biws"), channelBinding); + assertArrayEquals(Base64.decoder().decode("biws"), channelBinding); byte[] serverSignature = serverFinal.serverSignature(); - assertArrayEquals(DatatypeConverter.parseBase64Binary("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature); + assertArrayEquals(Base64.decoder().decode("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature); byte[] saltedPassword = formatter.saltedPassword(password, salt, iterations); byte[] serverKey = formatter.serverKey(saltedPassword); http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java index de97ce2..89e6260 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.common.security.scram; +import org.apache.kafka.common.utils.Base64; import org.junit.Before; import org.junit.Test; import java.nio.charset.StandardCharsets; import javax.security.sasl.SaslException; -import javax.xml.bind.DatatypeConverter; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -288,11 +288,11 @@ public class ScramMessagesTest { } private String randomBytesAsString() { - return DatatypeConverter.printBase64Binary(formatter.secureRandomBytes()); + return Base64.encoder().encodeToString(formatter.secureRandomBytes()); } private byte[] toBytes(String base64Str) { - return DatatypeConverter.parseBase64Binary(base64Str); + return Base64.decoder().decode(base64Str); }; private void checkClientFirstMessage(ClientFirstMessage message, String saslName, String nonce, String authzid) { @@ -303,14 +303,14 @@ public class ScramMessagesTest { private void checkServerFirstMessage(ServerFirstMessage message, String nonce, String salt, int iterations) { assertEquals(nonce, message.nonce()); - assertArrayEquals(DatatypeConverter.parseBase64Binary(salt), message.salt()); + assertArrayEquals(Base64.decoder().decode(salt), message.salt()); assertEquals(iterations, message.iterations()); } private void checkClientFinalMessage(ClientFinalMessage message, String channelBinding, String nonce, String proof) { - assertArrayEquals(DatatypeConverter.parseBase64Binary(channelBinding), message.channelBinding()); + assertArrayEquals(Base64.decoder().decode(channelBinding), message.channelBinding()); assertEquals(nonce, message.nonce()); - assertArrayEquals(DatatypeConverter.parseBase64Binary(proof), message.proof()); + assertArrayEquals(Base64.decoder().decode(proof), message.proof()); } private void checkServerFinalMessage(ServerFinalMessage message, String error, String serverSignature) { @@ -318,7 +318,7 @@ public class ScramMessagesTest { if (serverSignature == null) assertNull("Unexpected server signature", message.serverSignature()); else - assertArrayEquals(DatatypeConverter.parseBase64Binary(serverSignature), message.serverSignature()); + assertArrayEquals(Base64.decoder().decode(serverSignature), message.serverSignature()); } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index a27d5cb..47ca823 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -22,11 +22,11 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.Base64; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.xml.bind.DatatypeConverter; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -292,7 +292,7 @@ public class TestUtils { // Convert into normal variant and add padding at the end. String originalClusterId = String.format("%s==", clusterId.replace("_", "/").replace("-", "+")); - byte[] decodedUuid = DatatypeConverter.parseBase64Binary(originalClusterId); + byte[] decodedUuid = Base64.decoder().decode(originalClusterId); // We expect 16 bytes, same as the input UUID. assertEquals(decodedUuid.length, 16); http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/Kafka.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 1c16c96..7c5b420 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -20,6 +20,7 @@ package kafka import java.util.Properties import joptsimple.OptionParser +import kafka.utils.Implicits._ import kafka.server.{KafkaServer, KafkaServerStartable} import kafka.utils.{CommandLineUtils, Exit, Logging} import org.apache.kafka.common.utils.Utils @@ -47,7 +48,7 @@ object Kafka extends Logging { CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(",")) } - props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)) + props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala) } props } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/admin/ConfigCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index b18dcc9..366667b 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -24,6 +24,7 @@ import kafka.common.InvalidConfigException import kafka.log.LogConfig import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId} import kafka.utils.{CommandLineUtils, ZkUtils} +import kafka.utils.Implicits._ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram._ import org.apache.kafka.common.utils.Utils @@ -95,7 +96,7 @@ object ConfigCommand extends Config { if (invalidConfigs.nonEmpty) throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") - configs.putAll(configsToBeAdded) + configs ++= configsToBeAdded configsToBeDeleted.foreach(configs.remove(_)) utils.changeConfigs(zkUtils, entityType, entityName, configs) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index d1cd803..1100e87 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -25,6 +25,7 @@ import joptsimple.{OptionParser, OptionSpec} import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} import kafka.client.ClientUtils import kafka.common.{OffsetMetadataAndError, TopicAndPartition} +import kafka.utils.Implicits._ import kafka.consumer.SimpleConsumer import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNoNodeException @@ -514,7 +515,8 @@ object ConsumerGroupCommand extends Logging { properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000") properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer) properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) - if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))) + if (opts.options.has(opts.commandConfigOpt)) + properties ++= Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) new KafkaConsumer(properties) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 882fe21..2d3a76c 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -21,6 +21,7 @@ import java.util.Properties import joptsimple._ import kafka.common.AdminCommandFailedException +import kafka.utils.Implicits._ import kafka.consumer.Whitelist import kafka.log.LogConfig import kafka.server.ConfigType @@ -130,7 +131,7 @@ object TopicCommand extends Logging { val configsToBeAdded = parseTopicConfigsToBeAdded(opts) val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts) // compile the final set of configs - configs.putAll(configsToBeAdded) + configs ++= configsToBeAdded configsToBeDeleted.foreach(config => configs.remove(config)) AdminUtils.changeTopicConfig(zkUtils, topic, configs) println("Updated config for topic \"%s\".".format(topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala index e1d6e02..82b7dac 100644 --- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala +++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala @@ -62,7 +62,7 @@ object ZkSecurityMigrator extends Logging { + "authentication.") def run(args: Array[String]) { - var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) + val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) val parser = new OptionParser(false) val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure." + " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String]) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/api/FetchResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala b/core/src/main/scala/kafka/api/FetchResponse.scala index b5ef912..ae2f19c 100644 --- a/core/src/main/scala/kafka/api/FetchResponse.scala +++ b/core/src/main/scala/kafka/api/FetchResponse.scala @@ -33,7 +33,7 @@ object FetchResponsePartitionData { val messageSetSize = buffer.getInt val messageSetBuffer = buffer.slice() messageSetBuffer.limit(messageSetSize) - buffer.position(buffer.position + messageSetSize) + buffer.position(buffer.position() + messageSetSize) new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/api/ProducerRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala index 921e011..9cdb14b 100644 --- a/core/src/main/scala/kafka/api/ProducerRequest.scala +++ b/core/src/main/scala/kafka/api/ProducerRequest.scala @@ -93,7 +93,7 @@ case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion, val partitionMessageData = partitionAndData._2 val bytes = partitionMessageData.buffer buffer.putInt(partition) - buffer.putInt(bytes.limit) + buffer.putInt(bytes.limit()) buffer.put(bytes) bytes.rewind }) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 780ae52..4b07751 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -764,7 +764,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, time: Time, met topicDeleted || successful }.keys reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p)) - var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap + val partitionsToReassign = mutable.Map[TopicAndPartition, ReassignedPartitionsContext]() partitionsToReassign ++= partitionsBeingReassigned partitionsToReassign --= reassignedPartitions controllerContext.partitionsBeingReassigned ++= partitionsToReassign http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala index c9ec48a..051445c 100644 --- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala +++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala @@ -17,7 +17,6 @@ package kafka.javaapi import kafka.cluster.BrokerEndPoint -import org.apache.kafka.common.protocol.Errors import scala.collection.JavaConverters._ private[javaapi] object MetadataListImplicits { http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala index e3b2ec1..467d0a6 100644 --- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala @@ -85,7 +85,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, valueDecoder) val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]] for ((topic, streams) <- scalaReturn) { - var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]] + val javaStreamList = new java.util.ArrayList[KafkaStream[K,V]] for (stream <- streams) javaStreamList.add(stream) ret.put(topic, javaStreamList) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/AbstractIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index d569ad9..2d7cc7e 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -17,7 +17,7 @@ package kafka.log -import java.io.{File, IOException, RandomAccessFile} +import java.io.{File, RandomAccessFile} import java.nio.{ByteBuffer, MappedByteBuffer} import java.nio.channels.FileChannel import java.util.concurrent.locks.{Lock, ReentrantLock} @@ -69,7 +69,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon idx.position(0) else // if this is a pre-existing index, assume it is valid and set position to last entry - idx.position(roundDownToExactMultiple(idx.limit, entrySize)) + idx.position(roundDownToExactMultiple(idx.limit(), entrySize)) idx } finally { CoreUtils.swallow(raf.close()) @@ -80,11 +80,11 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon * The maximum number of entries this index can hold */ @volatile - private[this] var _maxEntries = mmap.limit / entrySize + private[this] var _maxEntries = mmap.limit() / entrySize /** The number of entries in this index */ @volatile - protected var _entries = mmap.position / entrySize + protected var _entries = mmap.position() / entrySize /** * True iff there are no more slots available in this index @@ -105,7 +105,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon inLock(lock) { val raf = new RandomAccessFile(file, "rw") val roundedNewSize = roundDownToExactMultiple(newSize, entrySize) - val position = mmap.position + val position = mmap.position() /* Windows won't let us modify the file length while the file is mmapped :-( */ if (OperatingSystem.IS_WINDOWS) @@ -113,7 +113,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon try { raf.setLength(roundedNewSize) mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize) - _maxEntries = mmap.limit / entrySize + _maxEntries = mmap.limit() / entrySize mmap.position(position) } finally { CoreUtils.swallow(raf.close()) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 85d6487..4f53b41 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -550,7 +550,7 @@ private[log] class Cleaner(val id: Int, // if any messages are to be retained, write them out val outputBuffer = result.output - if (outputBuffer.position > 0) { + if (outputBuffer.position() > 0) { outputBuffer.flip() val retained = MemoryRecords.readableRecords(outputBuffer) dest.append(firstOffset = retained.batches.iterator.next().baseOffset, @@ -558,11 +558,11 @@ private[log] class Cleaner(val id: Int, largestTimestamp = result.maxTimestamp, shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained) - throttler.maybeThrottle(outputBuffer.limit) + throttler.maybeThrottle(outputBuffer.limit()) } // if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again - if (readBuffer.limit > 0 && result.messagesRead == 0) + if (readBuffer.limit() > 0 && result.messagesRead == 0) growBuffers(maxLogMessageSize) } restoreBuffers() http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 8f82e65..c47d229 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import kafka.api.ApiVersion import kafka.message.{BrokerCompressionCodec, Message} import kafka.server.{KafkaConfig, ThrottledReplicaListValidator} +import kafka.utils.Implicits._ import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig} import org.apache.kafka.common.record.TimestampType @@ -269,8 +270,8 @@ object LogConfig { */ def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: Properties): LogConfig = { val props = new Properties() - props.putAll(defaults) - props.putAll(overrides) + defaults.asScala.foreach { case (k, v) => props.put(k, v) } + props ++= overrides LogConfig(props) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/OffsetIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 53c18fe..c156972 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -58,7 +58,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl private[this] var _lastOffset = lastEntry.offset debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d" - .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position)) + .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position())) /** * The last entry in the index @@ -144,7 +144,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl mmap.putInt(position) _entries += 1 _lastOffset = offset - require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".") + require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".") } else { throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s." .format(offset, entries, _lastOffset, file.getAbsolutePath)) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/OffsetMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala index 8b493c2..219bed3 100755 --- a/core/src/main/scala/kafka/log/OffsetMap.scala +++ b/core/src/main/scala/kafka/log/OffsetMap.scala @@ -149,7 +149,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend this.lookups = 0L this.probes = 0L this.lastOffset = -1L - Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit, 0.toByte) + Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + bytes.limit(), 0.toByte) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/ProducerStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index ce56a6c..974a50e 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -338,12 +338,12 @@ object ProducerStateManager { buffer.flip() // now fill in the CRC - val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - ProducerEntriesOffset) + val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit() - ProducerEntriesOffset) ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc) val fos = new FileOutputStream(file) try { - fos.write(buffer.array, buffer.arrayOffset, buffer.limit) + fos.write(buffer.array, buffer.arrayOffset, buffer.limit()) } finally { fos.close() } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/TimeIndex.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala index 6c9c32b..aab9300 100644 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -126,7 +126,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: mmap.putLong(timestamp) mmap.putInt((offset - baseOffset).toInt) _entries += 1 - require(_entries * entrySize == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".") + require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".") } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index c6fa1ce..62e2125 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -174,7 +174,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi /** * The total number of bytes in this message set, including any partial trailing messages */ - def sizeInBytes: Int = buffer.limit + def sizeInBytes: Int = buffer.limit() /** * The total number of bytes in this message set not including any partial, trailing messages http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/message/Message.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/Message.scala b/core/src/main/scala/kafka/message/Message.scala index 3530929..a469901 100755 --- a/core/src/main/scala/kafka/message/Message.scala +++ b/core/src/main/scala/kafka/message/Message.scala @@ -222,7 +222,7 @@ class Message(val buffer: ByteBuffer, * Compute the checksum of the message from the message contents */ def computeChecksum: Long = - Crc32.crc32(buffer, MagicOffset, buffer.limit - MagicOffset) + Crc32.crc32(buffer, MagicOffset, buffer.limit() - MagicOffset) /** * Retrieve the previously computed CRC for this message @@ -245,7 +245,7 @@ class Message(val buffer: ByteBuffer, /** * The complete serialized size of this message in bytes (including crc, header attributes, etc) */ - def size: Int = buffer.limit + def size: Int = buffer.limit() /** * The position where the key size is stored. http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/producer/ProducerPool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index 60cef63..6d4e4b7 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -23,6 +23,7 @@ import kafka.api.TopicMetadata import kafka.cluster.BrokerEndPoint import kafka.common.UnavailableProducerException import kafka.utils.Logging +import kafka.utils.Implicits._ import scala.collection.mutable.HashMap @@ -35,7 +36,7 @@ object ProducerPool { val props = new Properties() props.put("host", broker.host) props.put("port", broker.port.toString) - props.putAll(config.props.props) + props ++= config.props.props new SyncProducer(new SyncProducerConfig(props)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/producer/SyncProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index f02648f..04527c8 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -58,7 +58,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { */ if (logger.isDebugEnabled) { val buffer = new RequestOrResponseSend("", request).buffer - trace("verifying sendbuffer of size " + buffer.limit) + trace("verifying sendbuffer of size " + buffer.limit()) val requestTypeId = buffer.getShort() if(requestTypeId == ApiKeys.PRODUCE.id) { val request = ProducerRequest.readFrom(buffer) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/security/auth/Operation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala index a13345a..0fa311a 100644 --- a/core/src/main/scala/kafka/security/auth/Operation.scala +++ b/core/src/main/scala/kafka/security/auth/Operation.scala @@ -19,8 +19,6 @@ package kafka.security.auth import kafka.common.{BaseEnum, KafkaException} import org.apache.kafka.common.acl.AclOperation -import scala.util.{Failure, Success, Try} - /** * Different operations a client may perform on kafka resources. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/security/auth/PermissionType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala index c603351..c75e6f6 100644 --- a/core/src/main/scala/kafka/security/auth/PermissionType.scala +++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala @@ -19,8 +19,6 @@ package kafka.security.auth import kafka.common.{BaseEnum, KafkaException} import org.apache.kafka.common.acl.AclPermissionType -import scala.util.{Failure, Success, Try} - sealed trait PermissionType extends BaseEnum { val toJava: AclPermissionType } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 03eb9e3..6218a2c 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -26,7 +26,6 @@ import kafka.server.KafkaConfig import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils._ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException} -import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.auth.KafkaPrincipal import scala.collection.JavaConverters._ import org.apache.log4j.Logger http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/server/ConfigHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 149a1c1..79ffde8 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -25,6 +25,7 @@ import kafka.log.{LogConfig, LogManager} import kafka.security.CredentialProvider import kafka.server.Constants._ import kafka.server.QuotaFactory.QuotaManagers +import kafka.utils.Implicits._ import kafka.utils.Logging import org.apache.kafka.common.config.ConfigDef.Validator import org.apache.kafka.common.config.ConfigException @@ -32,7 +33,6 @@ import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.metrics.Quota._ import scala.collection.JavaConverters._ -import scala.collection.mutable /** * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager @@ -55,7 +55,7 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC if (logs.nonEmpty) { /* combine the default properties with the overrides in zk to create the new LogConfig */ val props = new Properties() - props.putAll(logManager.defaultConfig.originals) + props ++= logManager.defaultConfig.originals.asScala topicConfig.asScala.foreach { case (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/server/DelayedOperation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala index 4ae1b13..8997395 100644 --- a/core/src/main/scala/kafka/server/DelayedOperation.scala +++ b/core/src/main/scala/kafka/server/DelayedOperation.scala @@ -349,7 +349,7 @@ final class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: Stri def cancel(): List[T] = { val iter = operations.iterator() - var cancelled = new ListBuffer[T]() + val cancelled = new ListBuffer[T]() while (iter.hasNext) { val curr = iter.next() curr.cancel() http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/server/DelayedOperationKey.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala b/core/src/main/scala/kafka/server/DelayedOperationKey.scala index 1933339..bfa7fc2 100644 --- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala +++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala @@ -17,7 +17,6 @@ package kafka.server -import kafka.common.TopicAndPartition import org.apache.kafka.common.TopicPartition /** http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a900e6d..89ba641 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -26,6 +26,7 @@ import kafka.coordinator.group.OffsetConfig import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager} import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils +import kafka.utils.Implicits._ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.ConfigDef.ValidList import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig} @@ -875,8 +876,8 @@ object KafkaConfig { def fromProps(defaults: Properties, overrides: Properties, doLog: Boolean): KafkaConfig = { val props = new Properties() - props.putAll(defaults) - props.putAll(overrides) + props ++= defaults + props ++= overrides fromProps(props, doLog) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index e81dba2..a0818bc 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -29,6 +29,7 @@ import kafka.consumer._ import kafka.message._ import kafka.metrics.KafkaMetricsReporter import kafka.utils._ +import kafka.utils.Implicits._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.record.TimestampType @@ -173,8 +174,8 @@ object ConsoleConsumer extends Logging { def getOldConsumerProps(config: ConsumerConfig): Properties = { val props = new Properties - props.putAll(config.consumerProps) - props.putAll(config.extraConsumerProps) + props ++= config.consumerProps + props ++= config.extraConsumerProps setAutoOffsetResetValue(config, props) props.put("zookeeper.connect", config.zkConnectionStr) @@ -201,8 +202,8 @@ object ConsoleConsumer extends Logging { def getNewConsumerProps(config: ConsumerConfig): Properties = { val props = new Properties - props.putAll(config.consumerProps) - props.putAll(config.extraConsumerProps) + props ++= config.consumerProps + props ++= config.extraConsumerProps setAutoOffsetResetValue(config, props) props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ConsoleProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 1b22140..39bb0ff 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -21,6 +21,7 @@ import kafka.common._ import kafka.message._ import kafka.serializer._ import kafka.utils.{CommandLineUtils, Exit, ToolsUtils} +import kafka.utils.Implicits._ import kafka.producer.{NewShinyProducer, OldProducer} import java.util.Properties import java.io._ @@ -74,7 +75,7 @@ object ConsoleProducer { def getReaderProps(config: ProducerConfig): Properties = { val props = new Properties props.put("topic",config.topic) - props.putAll(config.cmdLineProps) + props ++= config.cmdLineProps props } @@ -106,7 +107,7 @@ object ConsoleProducer { if (config.options.has(config.producerConfigOpt)) Utils.loadProps(config.options.valueOf(config.producerConfigOpt)) else new Properties - props.putAll(config.extraProducerProps) + props ++= config.extraProducerProps props } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 7bec15f..025617f 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -443,14 +443,14 @@ object DumpLogSegments { } def recordOutOfOrderIndexTimestamp(file: File, indexTimestamp: Long, prevIndexTimestamp: Long) { - var outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]()) + val outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]()) if (outOfOrderSeq.isEmpty) outOfOrderTimestamp.put(file.getAbsolutePath, outOfOrderSeq) outOfOrderSeq += ((indexTimestamp, prevIndexTimestamp)) } def recordShallowOffsetNotFound(file: File, indexOffset: Long, logOffset: Long) { - var shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]()) + val shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]()) if (shallowOffsetNotFoundSeq.isEmpty) shallowOffsetNotFound.put(file.getAbsolutePath, shallowOffsetNotFoundSeq) shallowOffsetNotFoundSeq += ((indexOffset, logOffset)) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala index 49593c2..d8ce9b0 100644 --- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala @@ -17,7 +17,7 @@ package kafka.tools -import java.io.{FileOutputStream, FileWriter, OutputStreamWriter} +import java.io.{FileOutputStream, OutputStreamWriter} import java.nio.charset.StandardCharsets import joptsimple._ http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/GetOffsetShell.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala index f06c412..4104ded 100644 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala @@ -71,8 +71,8 @@ object GetOffsetShell { ToolsUtils.validatePortOrDie(parser, brokerList) val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList) val topic = options.valueOf(topicOpt) - var partitionList = options.valueOf(partitionOpt) - var time = options.valueOf(timeOpt).longValue + val partitionList = options.valueOf(partitionOpt) + val time = options.valueOf(timeOpt).longValue val nOffsets = options.valueOf(nOffsetsOpt).intValue val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue() http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala index d96569b..c345f94 100644 --- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala +++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala @@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets import joptsimple._ import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils} -import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.security.JaasUtils http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/JmxTool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala index 3273821..c122141 100644 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ b/core/src/main/scala/kafka/tools/JmxTool.scala @@ -28,7 +28,7 @@ import joptsimple.OptionParser import scala.collection.JavaConverters._ import scala.collection.mutable import scala.math._ -import kafka.utils.{CommandLineUtils, Exit, Logging} +import kafka.utils.{CommandLineUtils , Exit, Logging} /** @@ -177,14 +177,14 @@ object JmxTool extends Logging { } def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = { - var attributes = new mutable.HashMap[String, Any]() - for(name <- names) { + val attributes = new mutable.HashMap[String, Any]() + for (name <- names) { val mbean = mbsc.getMBeanInfo(name) - for(attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) { + for (attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) { val attr = attrObj.asInstanceOf[Attribute] attributesWhitelist match { case Some(allowedAttributes) => - if(allowedAttributes.contains(attr.getName)) + if (allowedAttributes.contains(attr.getName)) attributes(name + ":" + attr.getName) = attr.getValue case None => attributes(name + ":" + attr.getName) = attr.getValue } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ProducerPerformance.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index 0f21831..77f560b 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -20,6 +20,7 @@ package kafka.tools import kafka.metrics.KafkaMetricsReporter import kafka.producer.{NewShinyProducer, OldProducer} import kafka.utils.{CommandLineUtils, Exit, Logging, ToolsUtils, VerifiableProperties} +import kafka.utils.Implicits._ import kafka.message.CompressionCodec import kafka.serializer._ import java.util.concurrent.{CountDownLatch, Executors} @@ -205,7 +206,7 @@ object ProducerPerformance extends Logging { val producer = if (config.useNewProducer) { import org.apache.kafka.clients.producer.ProducerConfig - props.putAll(config.producerProps) + props ++= config.producerProps props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString) props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance") @@ -217,7 +218,7 @@ object ProducerPerformance extends Logging { props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") new NewShinyProducer(props) } else { - props.putAll(config.producerProps) + props ++= config.producerProps props.put("metadata.broker.list", config.brokerList) props.put("compression.codec", config.compressionCodec.codec.toString) props.put("send.buffer.bytes", (64 * 1024).toString) http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala index 5d4cc23..ca9c111 100644 --- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala +++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala @@ -18,7 +18,7 @@ package kafka.tools import joptsimple.OptionParser -import java.util.concurrent.{Executors, CountDownLatch} +import java.util.concurrent.CountDownLatch import java.util.Properties import kafka.consumer._ import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils} http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 0e8855c..ca753d5 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -24,7 +24,6 @@ import java.util.concurrent.locks.{Lock, ReadWriteLock} import java.lang.management._ import java.util.{Properties, UUID} import javax.management._ -import javax.xml.bind.DatatypeConverter import org.apache.kafka.common.protocol.SecurityProtocol @@ -32,7 +31,7 @@ import scala.collection._ import scala.collection.mutable import kafka.cluster.EndPoint import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.utils.{KafkaThread, Utils} +import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils} /** * General helper functions! @@ -279,7 +278,7 @@ object CoreUtils extends Logging { def generateUuidAsBase64(): String = { val uuid = UUID.randomUUID() - urlSafeBase64EncodeNoPadding(getBytesFromUuid(uuid)) + Base64.urlEncoderNoPadding.encodeToString(getBytesFromUuid(uuid)) } def getBytesFromUuid(uuid: UUID): Array[Byte] = { @@ -290,14 +289,6 @@ object CoreUtils extends Logging { uuidBytes.array } - def urlSafeBase64EncodeNoPadding(data: Array[Byte]): String = { - val base64EncodedUUID = DatatypeConverter.printBase64Binary(data) - //Convert to URL safe variant by replacing + and / with - and _ respectively. - val urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", "-").replace("/", "_") - // Remove the "==" padding at the end. - urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length - 2) - } - def propsWith(key: String, value: String): Properties = { propsWith((key, value)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/utils/Implicits.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Implicits.scala b/core/src/main/scala/kafka/utils/Implicits.scala new file mode 100644 index 0000000..5196d45 --- /dev/null +++ b/core/src/main/scala/kafka/utils/Implicits.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.util +import java.util.Properties + +import scala.collection.JavaConverters._ + +/** + * In order to have these implicits in scope, add the following import: + * + * `import kafka.utils.Implicits._` + */ +object Implicits { + + /** + * The java.util.Properties.putAll override introduced in Java 9 is seen as an overload by the + * Scala compiler causing ambiguity errors in some cases. The `++=` methods introduced via + * implicits provide a concise alternative. + * + * See https://github.com/scala/bug/issues/10418 for more details. + */ + implicit class PropertiesOps(properties: Properties) { + + def ++=(props: Properties): Unit = + (properties: util.Hashtable[AnyRef, AnyRef]).putAll(props) + + def ++=(map: collection.Map[String, AnyRef]): Unit = + (properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava) + + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/utils/ReplicationUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala index 9533ce9..cc08055 100644 --- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala +++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala @@ -18,7 +18,6 @@ package kafka.utils import kafka.api.LeaderAndIsr -import kafka.common.TopicAndPartition import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch} import kafka.utils.ZkUtils._ import org.apache.kafka.common.TopicPartition