KAFKA-4501; Java 9 compilation and runtime fixes

Compilation error fixes:
- Avoid ambiguity error when appending to Properties in Scala
code (https://github.com/scala/bug/issues/10418)
- Use position() and limit() to fix ambiguity issue (
https://github.com/scala/bug/issues/10418#issuecomment-316364778)
- Disable findBugs if Java 9 is used (
https://github.com/findbugsproject/findbugs/issues/105)

Compilation warning fixes:
- Avoid deprecated Class.newInstance in Utils.newInstance
- Silence a few Java 9 deprecation warnings
- var -> val and unused fixes

Runtime error fixes:
- Introduce Base64 class that works in Java 7 and Java 9

Also:
- Set --release option if building with Java 9

Note that tests involving EasyMock 
(https://github.com/easymock/easymock/issues/193)
or PowerMock (https://github.com/powermock/powermock/issues/783)
will fail as neither supports Java 9 currently.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3647 from ijuma/kafka-4501-support-java-9


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ed96523a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ed96523a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ed96523a

Branch: refs/heads/trunk
Commit: ed96523a2c763b48399a9720dcce0af44f5fc1a1
Parents: 3e22c1c
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Sat Aug 19 08:55:29 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Sat Aug 19 08:55:29 2017 +0100

----------------------------------------------------------------------
 build.gradle                                    |  38 ++-
 checkstyle/import-control.xml                   |   2 -
 .../security/plain/PlainSaslServerProvider.java |   3 +-
 .../security/scram/ScramCredentialUtils.java    |  15 +-
 .../common/security/scram/ScramMessages.java    |  19 +-
 .../security/scram/ScramSaslClientProvider.java |   3 +-
 .../security/scram/ScramSaslServerProvider.java |   3 +-
 .../org/apache/kafka/common/utils/Base64.java   | 261 +++++++++++++++++++
 .../org/apache/kafka/common/utils/Crc32C.java   |   4 +-
 .../org/apache/kafka/common/utils/Java.java     |   3 +
 .../org/apache/kafka/common/utils/Utils.java    |  12 +-
 .../authenticator/TestDigestLoginModule.java    |   3 +-
 .../security/scram/ScramFormatterTest.java      |   9 +-
 .../security/scram/ScramMessagesTest.java       |  14 +-
 .../java/org/apache/kafka/test/TestUtils.java   |   4 +-
 core/src/main/scala/kafka/Kafka.scala           |   3 +-
 .../main/scala/kafka/admin/ConfigCommand.scala  |   3 +-
 .../kafka/admin/ConsumerGroupCommand.scala      |   4 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   3 +-
 .../scala/kafka/admin/ZkSecurityMigrator.scala  |   2 +-
 .../main/scala/kafka/api/FetchResponse.scala    |   2 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   2 +-
 .../kafka/controller/KafkaController.scala      |   2 +-
 .../scala/kafka/javaapi/TopicMetadata.scala     |   1 -
 .../consumer/ZookeeperConsumerConnector.scala   |   2 +-
 .../main/scala/kafka/log/AbstractIndex.scala    |  12 +-
 core/src/main/scala/kafka/log/LogCleaner.scala  |   6 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |   5 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala |   4 +-
 core/src/main/scala/kafka/log/OffsetMap.scala   |   2 +-
 .../scala/kafka/log/ProducerStateManager.scala  |   4 +-
 core/src/main/scala/kafka/log/TimeIndex.scala   |   2 +-
 .../kafka/message/ByteBufferMessageSet.scala    |   2 +-
 core/src/main/scala/kafka/message/Message.scala |   4 +-
 .../scala/kafka/producer/ProducerPool.scala     |   3 +-
 .../scala/kafka/producer/SyncProducer.scala     |   2 +-
 .../scala/kafka/security/auth/Operation.scala   |   2 -
 .../kafka/security/auth/PermissionType.scala    |   2 -
 .../security/auth/SimpleAclAuthorizer.scala     |   1 -
 .../main/scala/kafka/server/ConfigHandler.scala |   4 +-
 .../scala/kafka/server/DelayedOperation.scala   |   2 +-
 .../kafka/server/DelayedOperationKey.scala      |   1 -
 .../main/scala/kafka/server/KafkaConfig.scala   |   5 +-
 .../scala/kafka/tools/ConsoleConsumer.scala     |   9 +-
 .../scala/kafka/tools/ConsoleProducer.scala     |   5 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |   4 +-
 .../scala/kafka/tools/ExportZkOffsets.scala     |   2 +-
 .../main/scala/kafka/tools/GetOffsetShell.scala |   4 +-
 .../scala/kafka/tools/ImportZkOffsets.scala     |   1 -
 core/src/main/scala/kafka/tools/JmxTool.scala   |  10 +-
 .../scala/kafka/tools/ProducerPerformance.scala |   5 +-
 .../scala/kafka/tools/ReplayLogProducer.scala   |   2 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala |  13 +-
 core/src/main/scala/kafka/utils/Implicits.scala |  49 ++++
 .../scala/kafka/utils/ReplicationUtils.scala    |   1 -
 .../src/main/scala/kafka/utils/ToolsUtils.scala |   9 +-
 .../kafka/api/AdminClientIntegrationTest.scala  |   8 +-
 .../kafka/api/BaseConsumerTest.scala            |   2 +-
 .../kafka/api/EndToEndClusterIdTest.scala       |   3 +-
 .../kafka/api/IntegrationTestHarness.scala      |   7 +-
 .../kafka/api/ProducerBounceTest.scala          |   3 +-
 .../api/SaslEndToEndAuthorizationTest.scala     |   3 +-
 .../api/SaslSslAdminClientIntegrationTest.scala |   2 +-
 ...tenersWithSameSecurityProtocolBaseTest.scala |   3 +-
 .../other/kafka/ReplicationQuotasTestRig.scala  |   3 +-
 .../other/kafka/TestLinearWriteSpeed.scala      |   4 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |   2 +-
 .../ZookeeperConsumerConnectorTest.scala        |   3 +-
 .../TransactionMarkerChannelManagerTest.scala   |   6 +-
 .../log/AbstractLogCleanerIntegrationTest.scala |   3 +-
 .../message/ByteBufferMessageSetTest.scala      |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  29 ++-
 .../test/scala/unit/kafka/utils/UtilsTest.scala |   8 +-
 73 files changed, 505 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 48f3f2f..f9862c6 100644
--- a/build.gradle
+++ b/build.gradle
@@ -82,7 +82,7 @@ ext {
 
   maxPermSizeArgs = []
   if (!JavaVersion.current().isJava8Compatible())
-    maxPermSizeArgs = ['-XX:MaxPermSize=512m']
+    maxPermSizeArgs += '-XX:MaxPermSize=512m'
 
   userMaxForks = project.hasProperty('maxParallelForks') ? 
maxParallelForks.toInteger() : null
 
@@ -137,14 +137,24 @@ subprojects {
   apply plugin: 'maven'
   apply plugin: 'signing'
   apply plugin: 'checkstyle'
-  apply plugin: 'findbugs'
+
+  if (!JavaVersion.current().isJava9Compatible())
+    apply plugin: 'findbugs'
 
   sourceCompatibility = 1.7
+  targetCompatibility = 1.7
 
   compileJava {
     options.encoding = 'UTF-8'
-    // Add unchecked once we drop support for Java 7 as 
@SuppressWarnings("unchecked") is too buggy in Java 7
     options.compilerArgs << "-Xlint:deprecation"
+    // -Xlint:unchecked is too buggy in Java 7, so we only enable for Java 8 
or higher
+    if (JavaVersion.current().isJava8Compatible())
+      options.compilerArgs << "-Xlint:unchecked"
+    // --release is the recommended way to select the target release, but it's 
only supported in Java 9 so we also
+    // set --source and --target via `sourceCompatibility` and 
`targetCompatibility`. If/when Gradle supports `--release`
+    // natively (https://github.com/gradle/gradle/issues/2510), we should 
switch to that.
+    if (JavaVersion.current().isJava9Compatible())
+      options.compilerArgs << "--release" << "7"
   }
 
   uploadArchives {
@@ -349,17 +359,19 @@ subprojects {
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
 
-  findbugs {
-    toolVersion = "3.0.1"
-    excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
-    ignoreFailures = false
-  }
-  test.dependsOn('findbugsMain')
+  if (!JavaVersion.current().isJava9Compatible()) {
+    findbugs {
+      toolVersion = "3.0.1"
+      excludeFilter = file("$rootDir/gradle/findbugs-exclude.xml")
+      ignoreFailures = false
+    }
+    test.dependsOn('findbugsMain')
 
-  tasks.withType(FindBugs) {
-    reports {
-      xml.enabled (project.hasProperty('xmlFindBugsReport'))
-      html.enabled (!project.hasProperty('xmlFindBugsReport'))
+    tasks.withType(FindBugs) {
+      reports {
+        xml.enabled(project.hasProperty('xmlFindBugsReport'))
+        html.enabled(!project.hasProperty('xmlFindBugsReport'))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 4bd907b..8c3e3ae 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -98,7 +98,6 @@
       </subpackage>
       <subpackage name="scram">
         <allow pkg="javax.crypto" />
-        <allow pkg="javax.xml.bind" />
         <allow pkg="org.apache.kafka.common.errors" />
       </subpackage>
     </subpackage>
@@ -247,7 +246,6 @@
   <subpackage name="test">
     <allow pkg="org.apache.kafka" />
     <allow pkg="org.bouncycastle" />
-    <allow pkg="javax.xml.bind" />
   </subpackage>
 
   <subpackage name="connect">

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
index 51998a9..ae14244 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServerProvider.java
@@ -25,9 +25,10 @@ public class PlainSaslServerProvider extends Provider {
 
     private static final long serialVersionUID = 1L;
 
+    @SuppressWarnings("deprecation")
     protected PlainSaslServerProvider() {
         super("Simple SASL/PLAIN Server Provider", 1.0, "Simple SASL/PLAIN 
Server Provider for Kafka");
-        super.put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, 
PlainSaslServerFactory.class.getName());
+        put("SaslServerFactory." + PlainSaslServer.PLAIN_MECHANISM, 
PlainSaslServerFactory.class.getName());
     }
 
     public static void initialize() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
index 8120c15..55b0651 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramCredentialUtils.java
@@ -19,9 +19,8 @@ package org.apache.kafka.common.security.scram;
 import java.util.Collection;
 import java.util.Properties;
 
-import javax.xml.bind.DatatypeConverter;
-
 import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.utils.Base64;
 
 /**
  * SCRAM Credential persistence utility functions. Implements format 
conversion used
@@ -41,11 +40,11 @@ public class ScramCredentialUtils {
     public static String credentialToString(ScramCredential credential) {
         return String.format("%s=%s,%s=%s,%s=%s,%s=%d",
                SALT,
-               DatatypeConverter.printBase64Binary(credential.salt()),
+               Base64.encoder().encodeToString(credential.salt()),
                STORED_KEY,
-               DatatypeConverter.printBase64Binary(credential.storedKey()),
+                Base64.encoder().encodeToString(credential.storedKey()),
                SERVER_KEY,
-               DatatypeConverter.printBase64Binary(credential.serverKey()),
+                Base64.encoder().encodeToString(credential.serverKey()),
                ITERATIONS,
                credential.iterations());
     }
@@ -56,9 +55,9 @@ public class ScramCredentialUtils {
                 !props.containsKey(SERVER_KEY) || 
!props.containsKey(ITERATIONS)) {
             throw new IllegalArgumentException("Credentials not valid: " + 
str);
         }
-        byte[] salt = 
DatatypeConverter.parseBase64Binary(props.getProperty(SALT));
-        byte[] storedKey = 
DatatypeConverter.parseBase64Binary(props.getProperty(STORED_KEY));
-        byte[] serverKey = 
DatatypeConverter.parseBase64Binary(props.getProperty(SERVER_KEY));
+        byte[] salt = Base64.decoder().decode(props.getProperty(SALT));
+        byte[] storedKey = 
Base64.decoder().decode(props.getProperty(STORED_KEY));
+        byte[] serverKey = 
Base64.decoder().decode(props.getProperty(SERVER_KEY));
         int iterations = Integer.parseInt(props.getProperty(ITERATIONS));
         return new ScramCredential(salt, storedKey, serverKey, iterations);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
index 6fd117d..1ad7266 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramMessages.java
@@ -16,12 +16,13 @@
  */
 package org.apache.kafka.common.security.scram;
 
+import org.apache.kafka.common.utils.Base64;
+
 import java.nio.charset.StandardCharsets;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import javax.security.sasl.SaslException;
-import javax.xml.bind.DatatypeConverter;
 
 /**
  * SCRAM request/response message creation and parsing based on
@@ -140,7 +141,7 @@ public class ScramMessages {
             }
             this.nonce = matcher.group("nonce");
             String salt = matcher.group("salt");
-            this.salt = DatatypeConverter.parseBase64Binary(salt);
+            this.salt = Base64.decoder().decode(salt);
         }
         public ServerFirstMessage(String clientNonce, String serverNonce, 
byte[] salt, int iterations) {
             this.nonce = clientNonce + serverNonce;
@@ -157,7 +158,7 @@ public class ScramMessages {
             return iterations;
         }
         String toMessage() {
-            return String.format("r=%s,s=%s,i=%d", nonce, 
DatatypeConverter.printBase64Binary(salt), iterations);
+            return String.format("r=%s,s=%s,i=%d", nonce, 
Base64.encoder().encodeToString(salt), iterations);
         }
     }
     /**
@@ -184,9 +185,9 @@ public class ScramMessages {
             if (!matcher.matches())
                 throw new SaslException("Invalid SCRAM client final message 
format: " + message);
 
-            this.channelBinding = 
DatatypeConverter.parseBase64Binary(matcher.group("channel"));
+            this.channelBinding = 
Base64.decoder().decode(matcher.group("channel"));
             this.nonce = matcher.group("nonce");
-            this.proof = 
DatatypeConverter.parseBase64Binary(matcher.group("proof"));
+            this.proof = Base64.decoder().decode(matcher.group("proof"));
         }
         public ClientFinalMessage(byte[] channelBinding, String nonce) {
             this.channelBinding = channelBinding;
@@ -206,13 +207,13 @@ public class ScramMessages {
         }
         public String clientFinalMessageWithoutProof() {
             return String.format("c=%s,r=%s",
-                    DatatypeConverter.printBase64Binary(channelBinding),
+                    Base64.encoder().encodeToString(channelBinding),
                     nonce);
         }
         String toMessage() {
             return String.format("%s,p=%s",
                     clientFinalMessageWithoutProof(),
-                    DatatypeConverter.printBase64Binary(proof));
+                    Base64.encoder().encodeToString(proof));
         }
     }
     /**
@@ -243,7 +244,7 @@ public class ScramMessages {
                 // ignore
             }
             if (error == null) {
-                this.serverSignature = 
DatatypeConverter.parseBase64Binary(matcher.group("signature"));
+                this.serverSignature = 
Base64.decoder().decode(matcher.group("signature"));
                 this.error = null;
             } else {
                 this.serverSignature = null;
@@ -264,7 +265,7 @@ public class ScramMessages {
             if (error != null)
                 return "e=" + error;
             else
-                return "v=" + 
DatatypeConverter.printBase64Binary(serverSignature);
+                return "v=" + Base64.encoder().encodeToString(serverSignature);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
index fac673e..d389f04 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslClientProvider.java
@@ -25,10 +25,11 @@ public class ScramSaslClientProvider extends Provider {
 
     private static final long serialVersionUID = 1L;
 
+    @SuppressWarnings("deprecation")
     protected ScramSaslClientProvider() {
         super("SASL/SCRAM Client Provider", 1.0, "SASL/SCRAM Client Provider 
for Kafka");
         for (ScramMechanism mechanism : ScramMechanism.values())
-            super.put("SaslClientFactory." + mechanism.mechanismName(), 
ScramSaslClientFactory.class.getName());
+            put("SaslClientFactory." + mechanism.mechanismName(), 
ScramSaslClientFactory.class.getName());
     }
 
     public static void initialize() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
index 2f768a3..9f2a6b3 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramSaslServerProvider.java
@@ -25,10 +25,11 @@ public class ScramSaslServerProvider extends Provider {
 
     private static final long serialVersionUID = 1L;
 
+    @SuppressWarnings("deprecation")
     protected ScramSaslServerProvider() {
         super("SASL/SCRAM Server Provider", 1.0, "SASL/SCRAM Server Provider 
for Kafka");
         for (ScramMechanism mechanism : ScramMechanism.values())
-            super.put("SaslServerFactory." + mechanism.mechanismName(), 
ScramSaslServerFactory.class.getName());
+            put("SaslServerFactory." + mechanism.mechanismName(), 
ScramSaslServerFactory.class.getName());
     }
 
     public static void initialize() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/utils/Base64.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Base64.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Base64.java
new file mode 100644
index 0000000..e06e1ee
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Base64.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.utils;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+
+/**
+ * Temporary class in order to support Java 7 and Java 9. `DatatypeConverter` 
is not in the base module of Java 9
+ * and `java.util.Base64` was only introduced in Java 8.
+ */
+public final class Base64 {
+
+    private static final Factory FACTORY;
+
+    static {
+        if (Java.IS_JAVA8_COMPATIBLE)
+            FACTORY = new Java8Factory();
+        else
+            FACTORY = new Java7Factory();
+    }
+
+    private Base64() {}
+
+    public static Encoder encoder() {
+        return FACTORY.encoder();
+    }
+
+    public static Encoder urlEncoderNoPadding() {
+        return FACTORY.urlEncoderNoPadding();
+    }
+
+    public static Decoder decoder() {
+        return FACTORY.decoder();
+    }
+
+    /* Contains a subset of methods from java.util.Base64.Encoder (introduced 
in Java 8) */
+    public interface Encoder {
+        String encodeToString(byte[] bytes);
+    }
+
+    /* Contains a subset of methods from java.util.Base64.Decoder (introduced 
in Java 8) */
+    public interface Decoder {
+        byte[] decode(String string);
+    }
+
+    private interface Factory {
+        Encoder urlEncoderNoPadding();
+        Encoder encoder();
+        Decoder decoder();
+    }
+
+    private static class Java8Factory implements Factory {
+
+        // Static final MethodHandles are optimised better by HotSpot
+        private static final MethodHandle URL_ENCODE_NO_PADDING;
+        private static final MethodHandle ENCODE;
+        private static final MethodHandle DECODE;
+
+        private static final Encoder URL_ENCODER_NO_PADDING;
+        private static final Encoder ENCODER;
+        private static final Decoder DECODER;
+
+        static {
+            try {
+                Class<?> base64Class = Class.forName("java.util.Base64");
+
+                MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+
+                Class<?> juEncoderClass = 
Class.forName("java.util.Base64$Encoder");
+
+                MethodHandle getEncoder = lookup.findStatic(base64Class, 
"getEncoder",
+                        MethodType.methodType(juEncoderClass));
+                Object juEncoder;
+                try {
+                    juEncoder = getEncoder.invoke();
+                } catch (Throwable throwable) {
+                    // Invoked method doesn't throw checked exceptions, so 
safe to cast
+                    throw (RuntimeException) throwable;
+                }
+                MethodHandle encode = lookup.findVirtual(juEncoderClass, 
"encodeToString",
+                        MethodType.methodType(String.class, byte[].class));
+                ENCODE = encode.bindTo(juEncoder);
+
+
+                MethodHandle getUrlEncoder = lookup.findStatic(base64Class, 
"getUrlEncoder",
+                        MethodType.methodType(juEncoderClass));
+                Object juUrlEncoderNoPassing;
+                try {
+                    juUrlEncoderNoPassing = lookup.findVirtual(juEncoderClass, 
"withoutPadding",
+                            
MethodType.methodType(juEncoderClass)).invoke(getUrlEncoder.invoke());
+                } catch (Throwable throwable) {
+                    // Invoked method doesn't throw checked exceptions, so 
safe to cast
+                    throw (RuntimeException) throwable;
+                }
+                URL_ENCODE_NO_PADDING = encode.bindTo(juUrlEncoderNoPassing);
+
+                Class<?> juDecoderClass = 
Class.forName("java.util.Base64$Decoder");
+                MethodHandle getDecoder = lookup.findStatic(base64Class, 
"getDecoder",
+                        MethodType.methodType(juDecoderClass));
+                MethodHandle decode = lookup.findVirtual(juDecoderClass, 
"decode",
+                        MethodType.methodType(byte[].class, String.class));
+                try {
+                    DECODE = decode.bindTo(getDecoder.invoke());
+                } catch (Throwable throwable) {
+                    // Invoked method doesn't throw checked exceptions, so 
safe to cast
+                    throw (RuntimeException) throwable;
+                }
+
+                URL_ENCODER_NO_PADDING = new Encoder() {
+                    @Override
+                    public String encodeToString(byte[] bytes) {
+                        try {
+                            return (String) 
URL_ENCODE_NO_PADDING.invokeExact(bytes);
+                        } catch (Throwable throwable) {
+                            // Invoked method doesn't throw checked 
exceptions, so safe to cast
+                            throw (RuntimeException) throwable;
+                        }
+                    }
+                };
+
+                ENCODER = new Encoder() {
+                    @Override
+                    public String encodeToString(byte[] bytes) {
+                        try {
+                            return (String) ENCODE.invokeExact(bytes);
+                        } catch (Throwable throwable) {
+                            // Invoked method doesn't throw checked 
exceptions, so safe to cast
+                            throw (RuntimeException) throwable;
+                        }
+                    }
+                };
+
+                DECODER = new Decoder() {
+                    @Override
+                    public byte[] decode(String string) {
+                        try {
+                            return (byte[]) DECODE.invokeExact(string);
+                        } catch (Throwable throwable) {
+                            // Invoked method doesn't throw checked 
exceptions, so safe to cast
+                            throw (RuntimeException) throwable;
+                        }
+                    }
+                };
+
+            } catch (ReflectiveOperationException e) {
+                // Should never happen
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public Encoder urlEncoderNoPadding() {
+            return URL_ENCODER_NO_PADDING;
+        }
+
+        @Override
+        public Encoder encoder() {
+            return ENCODER;
+        }
+
+        @Override
+        public Decoder decoder() {
+            return DECODER;
+        }
+    }
+
+    private static class Java7Factory implements Factory {
+
+        // Static final MethodHandles are optimised better by HotSpot
+        private static final MethodHandle PRINT;
+        private static final MethodHandle PARSE;
+
+        static {
+            try {
+                Class<?> cls = 
Class.forName("javax.xml.bind.DatatypeConverter");
+                MethodHandles.Lookup lookup = MethodHandles.publicLookup();
+                PRINT = lookup.findStatic(cls, "printBase64Binary", 
MethodType.methodType(String.class,
+                        byte[].class));
+                PARSE = lookup.findStatic(cls, "parseBase64Binary", 
MethodType.methodType(byte[].class,
+                        String.class));
+            } catch (ReflectiveOperationException e) {
+                // Should never happen
+                throw new RuntimeException(e);
+            }
+        }
+
+        public static final Encoder URL_ENCODER_NO_PADDING = new Encoder() {
+
+            @Override
+            public String encodeToString(byte[] bytes) {
+                String base64EncodedUUID = Java7Factory.encodeToString(bytes);
+                //Convert to URL safe variant by replacing + and / with - and 
_ respectively.
+                String urlSafeBase64EncodedUUID = 
base64EncodedUUID.replace("+", "-")
+                        .replace("/", "_");
+                // Remove the "==" padding at the end.
+                return urlSafeBase64EncodedUUID.substring(0, 
urlSafeBase64EncodedUUID.length() - 2);
+            }
+
+        };
+
+        public static final Encoder ENCODER = new Encoder() {
+            @Override
+            public String encodeToString(byte[] bytes) {
+                return Java7Factory.encodeToString(bytes);
+            }
+        };
+
+        public static final Decoder DECODER = new Decoder() {
+            @Override
+            public byte[] decode(String string) {
+                try {
+                    return (byte[]) PARSE.invokeExact(string);
+                } catch (Throwable throwable) {
+                    // Invoked method doesn't throw checked exceptions, so 
safe to cast
+                    throw (RuntimeException) throwable;
+                }
+            }
+        };
+
+        private static String encodeToString(byte[] bytes) {
+            try {
+                return (String) PRINT.invokeExact(bytes);
+            } catch (Throwable throwable) {
+                // Invoked method doesn't throw checked exceptions, so safe to 
cast
+                throw (RuntimeException) throwable;
+            }
+        }
+
+        @Override
+        public Encoder urlEncoderNoPadding() {
+            return URL_ENCODER_NO_PADDING;
+        }
+
+        @Override
+        public Encoder encoder() {
+            return ENCODER;
+        }
+
+        @Override
+        public Decoder decoder() {
+            return DECODER;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java
index b547beb..dfe22e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Crc32C.java
@@ -32,7 +32,7 @@ import java.util.zip.Checksum;
  *
  * NOTE: This class is intended for INTERNAL usage only within Kafka.
  */
-public class Crc32C {
+public final class Crc32C {
 
     private static final ChecksumFactory CHECKSUM_FACTORY;
 
@@ -43,6 +43,8 @@ public class Crc32C {
             CHECKSUM_FACTORY = new PureJavaChecksumFactory();
     }
 
+    private Crc32C() {}
+
     /**
      * Compute the CRC32C (Castagnoli) of the segment of the byte array given 
by the specified size and offset
      *

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/utils/Java.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Java.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Java.java
index b374c24..38d9541 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Java.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Java.java
@@ -41,6 +41,9 @@ public final class Java {
     public static final boolean IS_JAVA9_COMPATIBLE = JVM_MAJOR_VERSION > 1 ||
             (JVM_MAJOR_VERSION == 1 && JVM_MINOR_VERSION >= 9);
 
+    public static final boolean IS_JAVA8_COMPATIBLE = JVM_MAJOR_VERSION > 1 ||
+            (JVM_MAJOR_VERSION == 1 && JVM_MINOR_VERSION >= 8);
+
     public static boolean isIBMJdk() {
         return System.getProperty("java.vendor").contains("IBM");
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 75f8cf7..ee82f9a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -284,14 +284,14 @@ public class Utils {
      * Instantiate the class
      */
     public static <T> T newInstance(Class<T> c) {
+        if (c == null)
+            throw new KafkaException("class cannot be null");
         try {
-            return c.newInstance();
-        } catch (IllegalAccessException e) {
+            return c.getDeclaredConstructor().newInstance();
+        } catch (NoSuchMethodException e) {
+            throw new KafkaException("Could not find a public no-argument 
constructor for " + c.getName(), e);
+        } catch (ReflectiveOperationException | RuntimeException e) {
             throw new KafkaException("Could not instantiate class " + 
c.getName(), e);
-        } catch (InstantiationException e) {
-            throw new KafkaException("Could not instantiate class " + 
c.getName() + " Does it have a public no-argument constructor?", e);
-        } catch (NullPointerException e) {
-            throw new KafkaException("Requested class was null", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
index 8f9bce5..f1ef740 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/TestDigestLoginModule.java
@@ -105,9 +105,10 @@ public class TestDigestLoginModule extends 
PlainLoginModule {
 
         private static final long serialVersionUID = 1L;
 
+        @SuppressWarnings("deprecation")
         protected DigestSaslServerProvider() {
             super("Test SASL/Digest-MD5 Server Provider", 1.0, "Test 
SASL/Digest-MD5 Server Provider for Kafka");
-            super.put("SaslServerFactory.DIGEST-MD5", 
TestDigestLoginModule.DigestSaslServerFactory.class.getName());
+            put("SaslServerFactory.DIGEST-MD5", 
TestDigestLoginModule.DigestSaslServerFactory.class.getName());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
index 371031b..a86e0dd 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramFormatterTest.java
@@ -16,10 +16,9 @@
  */
 package org.apache.kafka.common.security.scram;
 
+import org.apache.kafka.common.utils.Base64;
 import org.junit.Test;
 
-import javax.xml.bind.DatatypeConverter;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
@@ -55,13 +54,13 @@ public class ScramFormatterTest {
         String serverNonce = 
serverFirst.nonce().substring(clientNonce.length());
         assertEquals("%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0", serverNonce);
         byte[] salt = serverFirst.salt();
-        
assertArrayEquals(DatatypeConverter.parseBase64Binary("W22ZaJ0SNY7soEsUEjb6gQ=="),
 salt);
+        assertArrayEquals(Base64.decoder().decode("W22ZaJ0SNY7soEsUEjb6gQ=="), 
salt);
         int iterations = serverFirst.iterations();
         assertEquals(4096, iterations);
         byte[] channelBinding = clientFinal.channelBinding();
-        assertArrayEquals(DatatypeConverter.parseBase64Binary("biws"), 
channelBinding);
+        assertArrayEquals(Base64.decoder().decode("biws"), channelBinding);
         byte[] serverSignature = serverFinal.serverSignature();
-        
assertArrayEquals(DatatypeConverter.parseBase64Binary("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="),
 serverSignature);
+        
assertArrayEquals(Base64.decoder().decode("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="),
 serverSignature);
 
         byte[] saltedPassword = formatter.saltedPassword(password, salt, 
iterations);
         byte[] serverKey = formatter.serverKey(saltedPassword);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
index de97ce2..89e6260 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/scram/ScramMessagesTest.java
@@ -16,13 +16,13 @@
  */
 package org.apache.kafka.common.security.scram;
 
+import org.apache.kafka.common.utils.Base64;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
 
 import javax.security.sasl.SaslException;
-import javax.xml.bind.DatatypeConverter;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -288,11 +288,11 @@ public class ScramMessagesTest {
     }
 
     private String randomBytesAsString() {
-        return 
DatatypeConverter.printBase64Binary(formatter.secureRandomBytes());
+        return Base64.encoder().encodeToString(formatter.secureRandomBytes());
     }
 
     private byte[] toBytes(String base64Str) {
-        return DatatypeConverter.parseBase64Binary(base64Str);
+        return Base64.decoder().decode(base64Str);
     };
 
     private void checkClientFirstMessage(ClientFirstMessage message, String 
saslName, String nonce, String authzid) {
@@ -303,14 +303,14 @@ public class ScramMessagesTest {
 
     private void checkServerFirstMessage(ServerFirstMessage message, String 
nonce, String salt, int iterations) {
         assertEquals(nonce, message.nonce());
-        assertArrayEquals(DatatypeConverter.parseBase64Binary(salt), 
message.salt());
+        assertArrayEquals(Base64.decoder().decode(salt), message.salt());
         assertEquals(iterations, message.iterations());
     }
 
     private void checkClientFinalMessage(ClientFinalMessage message, String 
channelBinding, String nonce, String proof) {
-        assertArrayEquals(DatatypeConverter.parseBase64Binary(channelBinding), 
message.channelBinding());
+        assertArrayEquals(Base64.decoder().decode(channelBinding), 
message.channelBinding());
         assertEquals(nonce, message.nonce());
-        assertArrayEquals(DatatypeConverter.parseBase64Binary(proof), 
message.proof());
+        assertArrayEquals(Base64.decoder().decode(proof), message.proof());
     }
 
     private void checkServerFinalMessage(ServerFinalMessage message, String 
error, String serverSignature) {
@@ -318,7 +318,7 @@ public class ScramMessagesTest {
         if (serverSignature == null)
             assertNull("Unexpected server signature", 
message.serverSignature());
         else
-            
assertArrayEquals(DatatypeConverter.parseBase64Binary(serverSignature), 
message.serverSignature());
+            assertArrayEquals(Base64.decoder().decode(serverSignature), 
message.serverSignature());
     }
 
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index a27d5cb..47ca823 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -22,11 +22,11 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.utils.Base64;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.xml.bind.DatatypeConverter;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -292,7 +292,7 @@ public class TestUtils {
 
         // Convert into normal variant and add padding at the end.
         String originalClusterId = String.format("%s==", 
clusterId.replace("_", "/").replace("-", "+"));
-        byte[] decodedUuid = 
DatatypeConverter.parseBase64Binary(originalClusterId);
+        byte[] decodedUuid = Base64.decoder().decode(originalClusterId);
 
         // We expect 16 bytes, same as the input UUID.
         assertEquals(decodedUuid.length, 16);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/Kafka.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/Kafka.scala 
b/core/src/main/scala/kafka/Kafka.scala
index 1c16c96..7c5b420 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -20,6 +20,7 @@ package kafka
 import java.util.Properties
 
 import joptsimple.OptionParser
+import kafka.utils.Implicits._
 import kafka.server.{KafkaServer, KafkaServerStartable}
 import kafka.utils.{CommandLineUtils, Exit, Logging}
 import org.apache.kafka.common.utils.Utils
@@ -47,7 +48,7 @@ object Kafka extends Logging {
         CommandLineUtils.printUsageAndDie(optionParser, "Found non argument 
parameters: " + options.nonOptionArguments().toArray.mkString(","))
       }
 
-      
props.putAll(CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala))
+      props ++= 
CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala)
     }
     props
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index b18dcc9..366667b 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -24,6 +24,7 @@ import kafka.common.InvalidConfigException
 import kafka.log.LogConfig
 import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig, QuotaId}
 import kafka.utils.{CommandLineUtils, ZkUtils}
+import kafka.utils.Implicits._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
 import org.apache.kafka.common.utils.Utils
@@ -95,7 +96,7 @@ object ConfigCommand extends Config {
     if (invalidConfigs.nonEmpty)
       throw new InvalidConfigException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
 
-    configs.putAll(configsToBeAdded)
+    configs ++= configsToBeAdded
     configsToBeDeleted.foreach(configs.remove(_))
 
     utils.changeConfigs(zkUtils, entityType, entityName, configs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index d1cd803..1100e87 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -25,6 +25,7 @@ import joptsimple.{OptionParser, OptionSpec}
 import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, 
PartitionOffsetRequestInfo}
 import kafka.client.ClientUtils
 import kafka.common.{OffsetMetadataAndError, TopicAndPartition}
+import kafka.utils.Implicits._
 import kafka.consumer.SimpleConsumer
 import kafka.utils._
 import org.I0Itec.zkclient.exception.ZkNoNodeException
@@ -514,7 +515,8 @@ object ConsumerGroupCommand extends Logging {
       properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
       properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
deserializer)
       properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserializer)
-      if (opts.options.has(opts.commandConfigOpt)) 
properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))
+      if (opts.options.has(opts.commandConfigOpt))
+        properties ++= 
Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
 
       new KafkaConsumer(properties)
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala 
b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 882fe21..2d3a76c 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -21,6 +21,7 @@ import java.util.Properties
 
 import joptsimple._
 import kafka.common.AdminCommandFailedException
+import kafka.utils.Implicits._
 import kafka.consumer.Whitelist
 import kafka.log.LogConfig
 import kafka.server.ConfigType
@@ -130,7 +131,7 @@ object TopicCommand extends Logging {
         val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
         val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
         // compile the final set of configs
-        configs.putAll(configsToBeAdded)
+        configs ++= configsToBeAdded
         configsToBeDeleted.foreach(config => configs.remove(config))
         AdminUtils.changeTopicConfig(zkUtils, topic, configs)
         println("Updated config for topic \"%s\".".format(topic))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala 
b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
index e1d6e02..82b7dac 100644
--- a/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
+++ b/core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
@@ -62,7 +62,7 @@ object ZkSecurityMigrator extends Logging {
                       + "authentication.")
 
   def run(args: Array[String]) {
-    var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
+    val jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     val parser = new OptionParser(false)
     val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make 
the Kafka znodes in ZooKeeper secure or unsecure."
         + " The options are 'secure' and 
'unsecure'").withRequiredArg().ofType(classOf[String])

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/api/FetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchResponse.scala 
b/core/src/main/scala/kafka/api/FetchResponse.scala
index b5ef912..ae2f19c 100644
--- a/core/src/main/scala/kafka/api/FetchResponse.scala
+++ b/core/src/main/scala/kafka/api/FetchResponse.scala
@@ -33,7 +33,7 @@ object FetchResponsePartitionData {
     val messageSetSize = buffer.getInt
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
-    buffer.position(buffer.position + messageSetSize)
+    buffer.position(buffer.position() + messageSetSize)
     new FetchResponsePartitionData(error, hw, new 
ByteBufferMessageSet(messageSetBuffer))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala 
b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 921e011..9cdb14b 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -93,7 +93,7 @@ case class ProducerRequest(versionId: Short = 
ProducerRequest.CurrentVersion,
           val partitionMessageData = partitionAndData._2
           val bytes = partitionMessageData.buffer
           buffer.putInt(partition)
-          buffer.putInt(bytes.limit)
+          buffer.putInt(bytes.limit())
           buffer.put(bytes)
           bytes.rewind
         })

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 780ae52..4b07751 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -764,7 +764,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, time: Time, met
       topicDeleted || successful
     }.keys
     reassignedPartitions.foreach(p => 
removePartitionFromReassignedPartitions(p))
-    var partitionsToReassign: mutable.Map[TopicAndPartition, 
ReassignedPartitionsContext] = new mutable.HashMap
+    val partitionsToReassign = mutable.Map[TopicAndPartition, 
ReassignedPartitionsContext]()
     partitionsToReassign ++= partitionsBeingReassigned
     partitionsToReassign --= reassignedPartitions
     controllerContext.partitionsBeingReassigned ++= partitionsToReassign

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala 
b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
index c9ec48a..051445c 100644
--- a/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
+++ b/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
@@ -17,7 +17,6 @@
 package kafka.javaapi
 
 import kafka.cluster.BrokerEndPoint
-import org.apache.kafka.common.protocol.Errors
 import scala.collection.JavaConverters._
 
 private[javaapi] object MetadataListImplicits {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index e3b2ec1..467d0a6 100644
--- 
a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ 
b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -85,7 +85,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: 
ConsumerConfig,
     val scalaReturn = underlying.consume(scalaTopicCountMap, keyDecoder, 
valueDecoder)
     val ret = new java.util.HashMap[String,java.util.List[KafkaStream[K,V]]]
     for ((topic, streams) <- scalaReturn) {
-      var javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
+      val javaStreamList = new java.util.ArrayList[KafkaStream[K,V]]
       for (stream <- streams)
         javaStreamList.add(stream)
       ret.put(topic, javaStreamList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index d569ad9..2d7cc7e 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.{File, IOException, RandomAccessFile}
+import java.io.{File, RandomAccessFile}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.nio.channels.FileChannel
 import java.util.concurrent.locks.{Lock, ReentrantLock}
@@ -69,7 +69,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, 
val baseOffset: Lon
         idx.position(0)
       else
         // if this is a pre-existing index, assume it is valid and set 
position to last entry
-        idx.position(roundDownToExactMultiple(idx.limit, entrySize))
+        idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
       idx
     } finally {
       CoreUtils.swallow(raf.close())
@@ -80,11 +80,11 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
    * The maximum number of entries this index can hold
    */
   @volatile
-  private[this] var _maxEntries = mmap.limit / entrySize
+  private[this] var _maxEntries = mmap.limit() / entrySize
 
   /** The number of entries in this index */
   @volatile
-  protected var _entries = mmap.position / entrySize
+  protected var _entries = mmap.position() / entrySize
 
   /**
    * True iff there are no more slots available in this index
@@ -105,7 +105,7 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
     inLock(lock) {
       val raf = new RandomAccessFile(file, "rw")
       val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
-      val position = mmap.position
+      val position = mmap.position()
 
       /* Windows won't let us modify the file length while the file is mmapped 
:-( */
       if (OperatingSystem.IS_WINDOWS)
@@ -113,7 +113,7 @@ abstract class AbstractIndex[K, V](@volatile var file: 
File, val baseOffset: Lon
       try {
         raf.setLength(roundedNewSize)
         mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 
roundedNewSize)
-        _maxEntries = mmap.limit / entrySize
+        _maxEntries = mmap.limit() / entrySize
         mmap.position(position)
       } finally {
         CoreUtils.swallow(raf.close())

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 85d6487..4f53b41 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -550,7 +550,7 @@ private[log] class Cleaner(val id: Int,
 
       // if any messages are to be retained, write them out
       val outputBuffer = result.output
-      if (outputBuffer.position > 0) {
+      if (outputBuffer.position() > 0) {
         outputBuffer.flip()
         val retained = MemoryRecords.readableRecords(outputBuffer)
         dest.append(firstOffset = retained.batches.iterator.next().baseOffset,
@@ -558,11 +558,11 @@ private[log] class Cleaner(val id: Int,
           largestTimestamp = result.maxTimestamp,
           shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp,
           records = retained)
-        throttler.maybeThrottle(outputBuffer.limit)
+        throttler.maybeThrottle(outputBuffer.limit())
       }
 
       // if we read bytes but didn't get even one complete message, our I/O 
buffer is too small, grow it and try again
-      if (readBuffer.limit > 0 && result.messagesRead == 0)
+      if (readBuffer.limit() > 0 && result.messagesRead == 0)
         growBuffers(maxLogMessageSize)
     }
     restoreBuffers()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index 8f82e65..c47d229 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import kafka.api.ApiVersion
 import kafka.message.{BrokerCompressionCodec, Message}
 import kafka.server.{KafkaConfig, ThrottledReplicaListValidator}
+import kafka.utils.Implicits._
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
 import org.apache.kafka.common.record.TimestampType
@@ -269,8 +270,8 @@ object LogConfig {
    */
   def fromProps(defaults: java.util.Map[_ <: Object, _ <: Object], overrides: 
Properties): LogConfig = {
     val props = new Properties()
-    props.putAll(defaults)
-    props.putAll(overrides)
+    defaults.asScala.foreach { case (k, v) => props.put(k, v) }
+    props ++= overrides
     LogConfig(props)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala 
b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 53c18fe..c156972 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -58,7 +58,7 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
   private[this] var _lastOffset = lastEntry.offset
   
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries 
= %d, lastOffset = %d, file position = %d"
-    .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, 
_lastOffset, mmap.position))
+    .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, 
_lastOffset, mmap.position()))
 
   /**
    * The last entry in the index
@@ -144,7 +144,7 @@ class OffsetIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writabl
         mmap.putInt(position)
         _entries += 1
         _lastOffset = offset
-        require(_entries * entrySize == mmap.position, entries + " entries but 
file position in index is " + mmap.position + ".")
+        require(_entries * entrySize == mmap.position(), entries + " entries 
but file position in index is " + mmap.position() + ".")
       } else {
         throw new InvalidOffsetException("Attempt to append an offset (%d) to 
position %d no larger than the last offset appended (%d) to %s."
           .format(offset, entries, _lastOffset, file.getAbsolutePath))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala 
b/core/src/main/scala/kafka/log/OffsetMap.scala
index 8b493c2..219bed3 100755
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -149,7 +149,7 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
     this.lookups = 0L
     this.probes = 0L
     this.lastOffset = -1L
-    Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + 
bytes.limit, 0.toByte)
+    Arrays.fill(bytes.array, bytes.arrayOffset, bytes.arrayOffset + 
bytes.limit(), 0.toByte)
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala 
b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index ce56a6c..974a50e 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -338,12 +338,12 @@ object ProducerStateManager {
     buffer.flip()
 
     // now fill in the CRC
-    val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit - 
ProducerEntriesOffset)
+    val crc = Crc32C.compute(buffer, ProducerEntriesOffset, buffer.limit() - 
ProducerEntriesOffset)
     ByteUtils.writeUnsignedInt(buffer, CrcOffset, crc)
 
     val fos = new FileOutputStream(file)
     try {
-      fos.write(buffer.array, buffer.arrayOffset, buffer.limit)
+      fos.write(buffer.array, buffer.arrayOffset, buffer.limit())
     } finally {
       fos.close()
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala 
b/core/src/main/scala/kafka/log/TimeIndex.scala
index 6c9c32b..aab9300 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -126,7 +126,7 @@ class TimeIndex(_file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable:
         mmap.putLong(timestamp)
         mmap.putInt((offset - baseOffset).toInt)
         _entries += 1
-        require(_entries * entrySize == mmap.position, _entries + " entries 
but file position in index is " + mmap.position + ".")
+        require(_entries * entrySize == mmap.position(), _entries + " entries 
but file position in index is " + mmap.position() + ".")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index c6fa1ce..62e2125 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -174,7 +174,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends 
MessageSet with Loggi
   /**
    * The total number of bytes in this message set, including any partial 
trailing messages
    */
-  def sizeInBytes: Int = buffer.limit
+  def sizeInBytes: Int = buffer.limit()
 
   /**
    * The total number of bytes in this message set not including any partial, 
trailing messages

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/message/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/Message.scala 
b/core/src/main/scala/kafka/message/Message.scala
index 3530929..a469901 100755
--- a/core/src/main/scala/kafka/message/Message.scala
+++ b/core/src/main/scala/kafka/message/Message.scala
@@ -222,7 +222,7 @@ class Message(val buffer: ByteBuffer,
    * Compute the checksum of the message from the message contents
    */
   def computeChecksum: Long =
-    Crc32.crc32(buffer, MagicOffset, buffer.limit - MagicOffset)
+    Crc32.crc32(buffer, MagicOffset, buffer.limit() - MagicOffset)
   
   /**
    * Retrieve the previously computed CRC for this message
@@ -245,7 +245,7 @@ class Message(val buffer: ByteBuffer,
   /**
    * The complete serialized size of this message in bytes (including crc, 
header attributes, etc)
    */
-  def size: Int = buffer.limit
+  def size: Int = buffer.limit()
 
   /**
    * The position where the key size is stored.

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/producer/ProducerPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala 
b/core/src/main/scala/kafka/producer/ProducerPool.scala
index 60cef63..6d4e4b7 100644
--- a/core/src/main/scala/kafka/producer/ProducerPool.scala
+++ b/core/src/main/scala/kafka/producer/ProducerPool.scala
@@ -23,6 +23,7 @@ import kafka.api.TopicMetadata
 import kafka.cluster.BrokerEndPoint
 import kafka.common.UnavailableProducerException
 import kafka.utils.Logging
+import kafka.utils.Implicits._
 
 import scala.collection.mutable.HashMap
 
@@ -35,7 +36,7 @@ object ProducerPool {
     val props = new Properties()
     props.put("host", broker.host)
     props.put("port", broker.port.toString)
-    props.putAll(config.props.props)
+    props ++= config.props.props
     new SyncProducer(new SyncProducerConfig(props))
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala 
b/core/src/main/scala/kafka/producer/SyncProducer.scala
index f02648f..04527c8 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -58,7 +58,7 @@ class SyncProducer(val config: SyncProducerConfig) extends 
Logging {
      */
     if (logger.isDebugEnabled) {
       val buffer = new RequestOrResponseSend("", request).buffer
-      trace("verifying sendbuffer of size " + buffer.limit)
+      trace("verifying sendbuffer of size " + buffer.limit())
       val requestTypeId = buffer.getShort()
       if(requestTypeId == ApiKeys.PRODUCE.id) {
         val request = ProducerRequest.readFrom(buffer)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala 
b/core/src/main/scala/kafka/security/auth/Operation.scala
index a13345a..0fa311a 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -19,8 +19,6 @@ package kafka.security.auth
 import kafka.common.{BaseEnum, KafkaException}
 import org.apache.kafka.common.acl.AclOperation
 
-import scala.util.{Failure, Success, Try}
-
 /**
  * Different operations a client may perform on kafka resources.
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/security/auth/PermissionType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala 
b/core/src/main/scala/kafka/security/auth/PermissionType.scala
index c603351..c75e6f6 100644
--- a/core/src/main/scala/kafka/security/auth/PermissionType.scala
+++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala
@@ -19,8 +19,6 @@ package kafka.security.auth
 import kafka.common.{BaseEnum, KafkaException}
 import org.apache.kafka.common.acl.AclPermissionType
 
-import scala.util.{Failure, Success, Try}
-
 sealed trait PermissionType extends BaseEnum {
   val toJava: AclPermissionType
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 03eb9e3..6218a2c 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -26,7 +26,6 @@ import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
-import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import scala.collection.JavaConverters._
 import org.apache.log4j.Logger

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 149a1c1..79ffde8 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -25,6 +25,7 @@ import kafka.log.{LogConfig, LogManager}
 import kafka.security.CredentialProvider
 import kafka.server.Constants._
 import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.common.config.ConfigDef.Validator
 import org.apache.kafka.common.config.ConfigException
@@ -32,7 +33,6 @@ import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.metrics.Quota._
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 /**
   * The ConfigHandler is used to process config change notifications received 
by the DynamicConfigManager
@@ -55,7 +55,7 @@ class TopicConfigHandler(private val logManager: LogManager, 
kafkaConfig: KafkaC
     if (logs.nonEmpty) {
       /* combine the default properties with the overrides in zk to create the 
new LogConfig */
       val props = new Properties()
-      props.putAll(logManager.defaultConfig.originals)
+      props ++= logManager.defaultConfig.originals.asScala
       topicConfig.asScala.foreach { case (key, value) =>
         if (!configNamesToExclude.contains(key)) props.put(key, value)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala 
b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 4ae1b13..8997395 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -349,7 +349,7 @@ final class DelayedOperationPurgatory[T <: 
DelayedOperation](purgatoryName: Stri
 
     def cancel(): List[T] = {
       val iter = operations.iterator()
-      var cancelled = new ListBuffer[T]()
+      val cancelled = new ListBuffer[T]()
       while (iter.hasNext) {
         val curr = iter.next()
         curr.cancel()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala 
b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index 1933339..bfa7fc2 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import kafka.common.TopicAndPartition
 import org.apache.kafka.common.TopicPartition
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a900e6d..89ba641 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,6 +26,7 @@ import kafka.coordinator.group.OffsetConfig
 import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, 
MessageSet}
 import kafka.utils.CoreUtils
+import kafka.utils.Implicits._
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.ConfigDef.ValidList
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SaslConfigs, SslConfigs, TopicConfig}
@@ -875,8 +876,8 @@ object KafkaConfig {
 
   def fromProps(defaults: Properties, overrides: Properties, doLog: Boolean): 
KafkaConfig = {
     val props = new Properties()
-    props.putAll(defaults)
-    props.putAll(overrides)
+    props ++= defaults
+    props ++= overrides
     fromProps(props, doLog)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index e81dba2..a0818bc 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -29,6 +29,7 @@ import kafka.consumer._
 import kafka.message._
 import kafka.metrics.KafkaMetricsReporter
 import kafka.utils._
+import kafka.utils.Implicits._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
 import org.apache.kafka.common.errors.WakeupException
 import org.apache.kafka.common.record.TimestampType
@@ -173,8 +174,8 @@ object ConsoleConsumer extends Logging {
   def getOldConsumerProps(config: ConsumerConfig): Properties = {
     val props = new Properties
 
-    props.putAll(config.consumerProps)
-    props.putAll(config.extraConsumerProps)
+    props ++= config.consumerProps
+    props ++= config.extraConsumerProps
     setAutoOffsetResetValue(config, props)
     props.put("zookeeper.connect", config.zkConnectionStr)
 
@@ -201,8 +202,8 @@ object ConsoleConsumer extends Logging {
   def getNewConsumerProps(config: ConsumerConfig): Properties = {
     val props = new Properties
 
-    props.putAll(config.consumerProps)
-    props.putAll(config.extraConsumerProps)
+    props ++= config.consumerProps
+    props ++= config.extraConsumerProps
     setAutoOffsetResetValue(config, props)
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer")

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 1b22140..39bb0ff 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -21,6 +21,7 @@ import kafka.common._
 import kafka.message._
 import kafka.serializer._
 import kafka.utils.{CommandLineUtils, Exit, ToolsUtils}
+import kafka.utils.Implicits._
 import kafka.producer.{NewShinyProducer, OldProducer}
 import java.util.Properties
 import java.io._
@@ -74,7 +75,7 @@ object ConsoleProducer {
   def getReaderProps(config: ProducerConfig): Properties = {
     val props = new Properties
     props.put("topic",config.topic)
-    props.putAll(config.cmdLineProps)
+    props ++= config.cmdLineProps
     props
   }
 
@@ -106,7 +107,7 @@ object ConsoleProducer {
       if (config.options.has(config.producerConfigOpt))
         Utils.loadProps(config.options.valueOf(config.producerConfigOpt))
       else new Properties
-    props.putAll(config.extraProducerProps)
+    props ++= config.extraProducerProps
     props
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 7bec15f..025617f 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -443,14 +443,14 @@ object DumpLogSegments {
     }
 
     def recordOutOfOrderIndexTimestamp(file: File, indexTimestamp: Long, 
prevIndexTimestamp: Long) {
-      var outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, 
new ArrayBuffer[(Long, Long)]())
+      val outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, 
new ArrayBuffer[(Long, Long)]())
       if (outOfOrderSeq.isEmpty)
         outOfOrderTimestamp.put(file.getAbsolutePath, outOfOrderSeq)
       outOfOrderSeq += ((indexTimestamp, prevIndexTimestamp))
     }
 
     def recordShallowOffsetNotFound(file: File, indexOffset: Long, logOffset: 
Long) {
-      var shallowOffsetNotFoundSeq = 
shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, 
Long)]())
+      val shallowOffsetNotFoundSeq = 
shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, 
Long)]())
       if (shallowOffsetNotFoundSeq.isEmpty)
         shallowOffsetNotFound.put(file.getAbsolutePath, 
shallowOffsetNotFoundSeq)
       shallowOffsetNotFoundSeq += ((indexOffset, logOffset))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala 
b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
index 49593c2..d8ce9b0 100644
--- a/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
@@ -17,7 +17,7 @@
 
 package kafka.tools
 
-import java.io.{FileOutputStream, FileWriter, OutputStreamWriter}
+import java.io.{FileOutputStream, OutputStreamWriter}
 import java.nio.charset.StandardCharsets
 
 import joptsimple._

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala 
b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index f06c412..4104ded 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -71,8 +71,8 @@ object GetOffsetShell {
     ToolsUtils.validatePortOrDie(parser, brokerList)
     val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
     val topic = options.valueOf(topicOpt)
-    var partitionList = options.valueOf(partitionOpt)
-    var time = options.valueOf(timeOpt).longValue
+    val partitionList = options.valueOf(partitionOpt)
+    val time = options.valueOf(timeOpt).longValue
     val nOffsets = options.valueOf(nOffsetsOpt).intValue
     val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala 
b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
index d96569b..c345f94 100644
--- a/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
+++ b/core/src/main/scala/kafka/tools/ImportZkOffsets.scala
@@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets
 
 import joptsimple._
 import kafka.utils.{CommandLineUtils, Exit, Logging, ZkUtils}
-import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.security.JaasUtils
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/JmxTool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala 
b/core/src/main/scala/kafka/tools/JmxTool.scala
index 3273821..c122141 100644
--- a/core/src/main/scala/kafka/tools/JmxTool.scala
+++ b/core/src/main/scala/kafka/tools/JmxTool.scala
@@ -28,7 +28,7 @@ import joptsimple.OptionParser
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.math._
-import kafka.utils.{CommandLineUtils, Exit, Logging}
+import kafka.utils.{CommandLineUtils , Exit, Logging}
 
 
 /**
@@ -177,14 +177,14 @@ object JmxTool extends Logging {
   }
 
   def queryAttributes(mbsc: MBeanServerConnection, names: 
Iterable[ObjectName], attributesWhitelist: Option[Array[String]]) = {
-    var attributes = new mutable.HashMap[String, Any]()
-    for(name <- names) {
+    val attributes = new mutable.HashMap[String, Any]()
+    for (name <- names) {
       val mbean = mbsc.getMBeanInfo(name)
-      for(attrObj <- mbsc.getAttributes(name, 
mbean.getAttributes.map(_.getName)).asScala) {
+      for (attrObj <- mbsc.getAttributes(name, 
mbean.getAttributes.map(_.getName)).asScala) {
         val attr = attrObj.asInstanceOf[Attribute]
         attributesWhitelist match {
           case Some(allowedAttributes) =>
-            if(allowedAttributes.contains(attr.getName))
+            if (allowedAttributes.contains(attr.getName))
               attributes(name + ":" + attr.getName) = attr.getValue
           case None => attributes(name + ":" + attr.getName) = attr.getValue
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala 
b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index 0f21831..77f560b 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -20,6 +20,7 @@ package kafka.tools
 import kafka.metrics.KafkaMetricsReporter
 import kafka.producer.{NewShinyProducer, OldProducer}
 import kafka.utils.{CommandLineUtils, Exit, Logging, ToolsUtils, 
VerifiableProperties}
+import kafka.utils.Implicits._
 import kafka.message.CompressionCodec
 import kafka.serializer._
 import java.util.concurrent.{CountDownLatch, Executors}
@@ -205,7 +206,7 @@ object ProducerPerformance extends Logging {
     val producer =
       if (config.useNewProducer) {
         import org.apache.kafka.clients.producer.ProducerConfig
-        props.putAll(config.producerProps)
+        props ++= config.producerProps
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
         props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
@@ -217,7 +218,7 @@ object ProducerPerformance extends Logging {
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
         new NewShinyProducer(props)
       } else {
-        props.putAll(config.producerProps)
+        props ++= config.producerProps
         props.put("metadata.broker.list", config.brokerList)
         props.put("compression.codec", config.compressionCodec.codec.toString)
         props.put("send.buffer.bytes", (64 * 1024).toString)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala 
b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
index 5d4cc23..ca9c111 100644
--- a/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
+++ b/core/src/main/scala/kafka/tools/ReplayLogProducer.scala
@@ -18,7 +18,7 @@
 package kafka.tools
 
 import joptsimple.OptionParser
-import java.util.concurrent.{Executors, CountDownLatch}
+import java.util.concurrent.CountDownLatch
 import java.util.Properties
 import kafka.consumer._
 import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 0e8855c..ca753d5 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -24,7 +24,6 @@ import java.util.concurrent.locks.{Lock, ReadWriteLock}
 import java.lang.management._
 import java.util.{Properties, UUID}
 import javax.management._
-import javax.xml.bind.DatatypeConverter
 
 import org.apache.kafka.common.protocol.SecurityProtocol
 
@@ -32,7 +31,7 @@ import scala.collection._
 import scala.collection.mutable
 import kafka.cluster.EndPoint
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.utils.{KafkaThread, Utils}
+import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils}
 
 /**
  * General helper functions!
@@ -279,7 +278,7 @@ object CoreUtils extends Logging {
 
   def generateUuidAsBase64(): String = {
     val uuid = UUID.randomUUID()
-    urlSafeBase64EncodeNoPadding(getBytesFromUuid(uuid))
+    Base64.urlEncoderNoPadding.encodeToString(getBytesFromUuid(uuid))
   }
 
   def getBytesFromUuid(uuid: UUID): Array[Byte] = {
@@ -290,14 +289,6 @@ object CoreUtils extends Logging {
     uuidBytes.array
   }
 
-  def urlSafeBase64EncodeNoPadding(data: Array[Byte]): String = {
-    val base64EncodedUUID = DatatypeConverter.printBase64Binary(data)
-    //Convert to URL safe variant by replacing + and / with - and _ 
respectively.
-    val urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", 
"-").replace("/", "_")
-    // Remove the "==" padding at the end.
-    urlSafeBase64EncodedUUID.substring(0, urlSafeBase64EncodedUUID.length - 2)
-  }
-
   def propsWith(key: String, value: String): Properties = {
     propsWith((key, value))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/utils/Implicits.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Implicits.scala 
b/core/src/main/scala/kafka/utils/Implicits.scala
new file mode 100644
index 0000000..5196d45
--- /dev/null
+++ b/core/src/main/scala/kafka/utils/Implicits.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+/**
+  * In order to have these implicits in scope, add the following import:
+  *
+  * `import kafka.utils.Implicits._`
+  */
+object Implicits {
+
+  /**
+   * The java.util.Properties.putAll override introduced in Java 9 is seen as 
an overload by the
+   * Scala compiler causing ambiguity errors in some cases. The `++=` methods 
introduced via
+   * implicits provide a concise alternative.
+   *
+   * See https://github.com/scala/bug/issues/10418 for more details.
+   */
+  implicit class PropertiesOps(properties: Properties) {
+
+    def ++=(props: Properties): Unit =
+      (properties: util.Hashtable[AnyRef, AnyRef]).putAll(props)
+
+    def ++=(map: collection.Map[String, AnyRef]): Unit =
+      (properties: util.Hashtable[AnyRef, AnyRef]).putAll(map.asJava)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ed96523a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala 
b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 9533ce9..cc08055 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -18,7 +18,6 @@
 package kafka.utils
 
 import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
 import kafka.controller.{IsrChangeNotificationListener, 
LeaderIsrAndControllerEpoch}
 import kafka.utils.ZkUtils._
 import org.apache.kafka.common.TopicPartition

Reply via email to