MINOR: Use SecurityProtocol in AuthenticationContext

Since we removed the unused `TRACE` option from `SecurityProtocol`, it now 
seems safer to expose it from `AuthenticationContext`. Additionally this patch 
exposes javadocs under security.auth and relocates the `Login` and 
`AuthCallbackHandler` to a non-public package.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>, Manikumar Reddy 
<manikumar.re...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #3863 from hachikuji/use-security-protocol-in-auth-context


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5383f9be
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5383f9be
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5383f9be

Branch: refs/heads/trunk
Commit: 5383f9bed0ec4fda86b74a94d9d1ba595a2a1c8a
Parents: 198302f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Oct 4 09:20:21 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Oct 4 09:20:21 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |  1 +
 .../org/apache/kafka/clients/ClientUtils.java   |  2 +-
 .../kafka/clients/CommonClientConfigs.java      |  2 +-
 .../kafka/common/network/ChannelBuilders.java   |  2 +-
 .../kafka/common/network/ListenerName.java      |  2 +-
 .../common/network/SaslChannelBuilder.java      |  2 +-
 .../kafka/common/protocol/SecurityProtocol.java | 75 --------------------
 .../kafka/common/requests/RequestContext.java   |  2 +-
 .../common/requests/UpdateMetadataRequest.java  |  2 +-
 .../security/auth/AuthCallbackHandler.java      | 45 ------------
 .../security/auth/AuthenticationContext.java    |  3 +-
 .../kafka/common/security/auth/Login.java       | 57 ---------------
 .../auth/PlaintextAuthenticationContext.java    |  6 +-
 .../auth/SaslAuthenticationContext.java         |  6 +-
 .../common/security/auth/SecurityProtocol.java  | 75 ++++++++++++++++++++
 .../security/auth/SslAuthenticationContext.java |  6 +-
 .../security/authenticator/AbstractLogin.java   |  1 -
 .../authenticator/AuthCallbackHandler.java      | 45 ++++++++++++
 .../common/security/authenticator/Login.java    | 57 +++++++++++++++
 .../security/authenticator/LoginManager.java    |  1 -
 .../authenticator/SaslClientAuthenticator.java  |  1 -
 .../SaslClientCallbackHandler.java              |  1 -
 .../authenticator/SaslServerAuthenticator.java  |  3 +-
 .../SaslServerCallbackHandler.java              |  1 -
 .../scram/ScramServerCallbackHandler.java       |  2 +-
 .../apache/kafka/common/network/EchoServer.java |  2 +-
 .../kafka/common/network/NetworkTestUtils.java  |  2 +-
 .../kafka/common/network/NioEchoServer.java     |  2 +-
 .../common/network/SaslChannelBuilderTest.java  |  2 +-
 .../kafka/common/network/SelectorTest.java      |  2 +-
 .../kafka/common/network/SslSelectorTest.java   |  2 +-
 .../common/network/SslTransportLayerTest.java   |  2 +-
 .../common/requests/RequestContextTest.java     |  2 +-
 .../common/requests/RequestResponseTest.java    |  2 +-
 .../auth/DefaultKafkaPrincipalBuilderTest.java  |  1 -
 .../ClientAuthenticationFailureTest.java        |  2 +-
 .../authenticator/SaslAuthenticatorTest.java    |  2 +-
 .../SaslServerAuthenticatorTest.java            |  2 +-
 .../kafka/admin/ConsumerGroupCommand.scala      |  3 +-
 .../main/scala/kafka/client/ClientUtils.scala   |  4 +-
 core/src/main/scala/kafka/cluster/Broker.scala  |  2 +-
 .../src/main/scala/kafka/cluster/EndPoint.scala |  2 +-
 .../controller/ControllerChannelManager.scala   |  3 +-
 .../main/scala/kafka/network/SocketServer.scala |  2 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  2 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |  2 +-
 .../scala/kafka/tools/UpdateOffsetsInZK.scala   |  2 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala |  3 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  7 +-
 .../kafka/api/BaseProducerSendTest.scala        |  2 +-
 .../kafka/api/EndToEndClusterIdTest.scala       | 16 ++---
 .../api/GroupCoordinatorIntegrationTest.scala   |  2 +-
 .../integration/kafka/api/MetricsTest.scala     |  3 +-
 .../kafka/api/PlaintextConsumerTest.scala       |  2 +-
 .../PlaintextEndToEndAuthorizationTest.scala    |  3 +-
 .../kafka/api/ProducerCompressionTest.scala     |  4 +-
 .../SaslClientsWithInvalidCredentialsTest.scala | 11 ++-
 .../api/SaslEndToEndAuthorizationTest.scala     |  2 +-
 .../api/SaslMultiMechanismConsumerTest.scala    |  2 +-
 .../api/SaslPlainPlaintextConsumerTest.scala    |  2 +-
 .../kafka/api/SaslPlaintextConsumerTest.scala   |  2 +-
 .../SaslScramSslEndToEndAuthorizationTest.scala |  1 -
 .../api/SaslSslAdminClientIntegrationTest.scala |  3 +-
 .../kafka/api/SaslSslConsumerTest.scala         |  2 +-
 .../integration/kafka/api/SslConsumerTest.scala |  2 +-
 .../api/SslEndToEndAuthorizationTest.scala      |  3 +-
 .../kafka/api/SslProducerSendTest.scala         |  2 +-
 .../kafka/api/TransactionsBounceTest.scala      |  5 +-
 .../kafka/api/TransactionsTest.scala            |  2 +-
 .../kafka/api/UserClientIdQuotaTest.scala       |  4 +-
 .../integration/kafka/api/UserQuotaTest.scala   |  2 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  2 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |  2 +-
 .../api/RequestResponseSerializationTest.scala  |  3 +-
 .../unit/kafka/cluster/BrokerEndPointTest.scala |  2 +-
 .../integration/KafkaServerTestHarness.scala    |  3 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  1 -
 .../unit/kafka/network/SocketServerTest.scala   |  6 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |  3 +-
 .../unit/kafka/server/AdvertiseBrokerTest.scala |  2 +-
 .../unit/kafka/server/BaseRequestTest.scala     |  3 +-
 .../kafka/server/ClientQuotaManagerTest.scala   |  1 -
 .../unit/kafka/server/EdgeCaseRequestTest.scala |  5 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |  4 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |  2 +-
 .../unit/kafka/server/LeaderElectionTest.scala  |  3 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |  3 +-
 .../unit/kafka/server/ProduceRequestTest.scala  |  2 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |  4 +-
 .../server/SaslApiVersionsRequestTest.scala     |  3 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  2 +-
 docs/upgrade.html                               |  2 +
 .../kafka/tools/VerifiableLog4jAppender.java    |  2 +-
 93 files changed, 290 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4b7c10c..7c93a45 100644
--- a/build.gradle
+++ b/build.gradle
@@ -834,6 +834,7 @@ project(':clients') {
     include "**/org/apache/kafka/common/resource/*"
     include "**/org/apache/kafka/common/serialization/*"
     include "**/org/apache/kafka/common/config/*"
+    include "**/org/apache/kafka/common/security/auth/*"
     include "**/org/apache/kafka/server/policy/*"
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 4612322..ea4c4db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.security.JaasContext;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index c54cb13..7b9e0f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 785c671..42723ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.JaasContext;
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java 
b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
index 07ce7de..9da4cca 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.network;
 
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.util.Locale;
 import java.util.Objects;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 5e1e407..e4eb791 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.authenticator.LoginManager;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
deleted file mode 100644
index c155481..0000000
--- 
a/clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-public enum SecurityProtocol {
-    /** Un-authenticated, non-encrypted channel */
-    PLAINTEXT(0, "PLAINTEXT"),
-    /** SSL channel */
-    SSL(1, "SSL"),
-    /** SASL authenticated, non-encrypted channel */
-    SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),
-    /** SASL authenticated, SSL channel */
-    SASL_SSL(3, "SASL_SSL");
-
-    private static final Map<Short, SecurityProtocol> 
CODE_TO_SECURITY_PROTOCOL;
-    private static final List<String> NAMES;
-
-    static {
-        SecurityProtocol[] protocols = SecurityProtocol.values();
-        List<String> names = new ArrayList<>(protocols.length);
-        Map<Short, SecurityProtocol> codeToSecurityProtocol = new 
HashMap<>(protocols.length);
-        for (SecurityProtocol proto : protocols) {
-            codeToSecurityProtocol.put(proto.id, proto);
-            names.add(proto.name);
-        }
-        CODE_TO_SECURITY_PROTOCOL = 
Collections.unmodifiableMap(codeToSecurityProtocol);
-        NAMES = Collections.unmodifiableList(names);
-    }
-
-    /** The permanent and immutable id of a security protocol -- this can't 
change, and must match kafka.cluster.SecurityProtocol  */
-    public final short id;
-
-    /** Name of the security protocol. This may be used by client 
configuration. */
-    public final String name;
-
-    SecurityProtocol(int id, String name) {
-        this.id = (short) id;
-        this.name = name;
-    }
-
-    public static List<String> names() {
-        return NAMES;
-    }
-
-    public static SecurityProtocol forId(short id) {
-        return CODE_TO_SECURITY_PROTOCOL.get(id);
-    }
-
-    /** Case insensitive lookup by protocol name */
-    public static SecurityProtocol forName(String name) {
-        return SecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java 
b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
index 5132202..232c18a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
@@ -20,9 +20,9 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 6c36bda..da48e9f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -21,11 +21,11 @@ import 
org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Utils;
 
 import java.nio.ByteBuffer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
deleted file mode 100644
index bfee577..0000000
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthCallbackHandler.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.security.auth;
-
-import java.util.Map;
-
-import org.apache.kafka.common.network.Mode;
-
-import javax.security.auth.Subject;
-import javax.security.auth.callback.CallbackHandler;
-
-/*
- * Callback handler for SASL-based authentication
- */
-public interface AuthCallbackHandler extends CallbackHandler {
-
-    /**
-     * Configures this callback handler.
-     *
-     * @param configs Configuration
-     * @param mode The mode that indicates if this is a client or server 
connection
-     * @param subject Subject from login context
-     * @param saslMechanism Negotiated SASL mechanism
-     */
-    void configure(Map<String, ?> configs, Mode mode, Subject subject, String 
saslMechanism);
-
-    /**
-     * Closes this instance.
-     */
-    void close();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
index 8c82954..b8c0847 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java
@@ -25,9 +25,8 @@ import java.net.InetAddress;
 public interface AuthenticationContext {
     /**
      * Underlying security protocol of the authentication session.
-     * @return The name of the security protocol (i.e. PLAINTEXT, 
SASL_PLAINTEXT, SASL_SSL, SSL)
      */
-    String securityProtocolName();
+    SecurityProtocol securityProtocol();
 
     /**
      * Address of the authenticated client

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java 
b/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
deleted file mode 100644
index 72b5725..0000000
--- a/clients/src/main/java/org/apache/kafka/common/security/auth/Login.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.security.auth;
-
-import org.apache.kafka.common.security.JaasContext;
-
-import java.util.Map;
-
-import javax.security.auth.Subject;
-import javax.security.auth.login.LoginContext;
-import javax.security.auth.login.LoginException;
-
-/**
- * Login interface for authentication.
- */
-public interface Login {
-
-    /**
-     * Configures this login instance.
-     */
-    void configure(Map<String, ?> configs, JaasContext jaasContext);
-
-    /**
-     * Performs login for each login module specified for the login context of 
this instance.
-     */
-    LoginContext login() throws LoginException;
-
-    /**
-     * Returns the authenticated subject of this login context.
-     */
-    Subject subject();
-
-    /**
-     * Returns the service name to be used for SASL.
-     */
-    String serviceName();
-
-    /**
-     * Closes this instance.
-     */
-    void close();
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
index 96b8376..bc14d36 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.common.security.auth;
 
-import org.apache.kafka.common.protocol.SecurityProtocol;
-
 import java.net.InetAddress;
 
 public class PlaintextAuthenticationContext implements AuthenticationContext {
@@ -28,8 +26,8 @@ public class PlaintextAuthenticationContext implements 
AuthenticationContext {
     }
 
     @Override
-    public String securityProtocolName() {
-        return SecurityProtocol.PLAINTEXT.name;
+    public SecurityProtocol securityProtocol() {
+        return SecurityProtocol.PLAINTEXT;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
index f98164b..89e6063 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.common.security.auth;
 
-import org.apache.kafka.common.protocol.SecurityProtocol;
-
 import javax.security.sasl.SaslServer;
 import java.net.InetAddress;
 
@@ -37,8 +35,8 @@ public class SaslAuthenticationContext implements 
AuthenticationContext {
     }
 
     @Override
-    public String securityProtocolName() {
-        return securityProtocol.name;
+    public SecurityProtocol securityProtocol() {
+        return securityProtocol;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java
new file mode 100644
index 0000000..f48a194
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SecurityProtocol.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public enum SecurityProtocol {
+    /** Un-authenticated, non-encrypted channel */
+    PLAINTEXT(0, "PLAINTEXT"),
+    /** SSL channel */
+    SSL(1, "SSL"),
+    /** SASL authenticated, non-encrypted channel */
+    SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),
+    /** SASL authenticated, SSL channel */
+    SASL_SSL(3, "SASL_SSL");
+
+    private static final Map<Short, SecurityProtocol> 
CODE_TO_SECURITY_PROTOCOL;
+    private static final List<String> NAMES;
+
+    static {
+        SecurityProtocol[] protocols = SecurityProtocol.values();
+        List<String> names = new ArrayList<>(protocols.length);
+        Map<Short, SecurityProtocol> codeToSecurityProtocol = new 
HashMap<>(protocols.length);
+        for (SecurityProtocol proto : protocols) {
+            codeToSecurityProtocol.put(proto.id, proto);
+            names.add(proto.name);
+        }
+        CODE_TO_SECURITY_PROTOCOL = 
Collections.unmodifiableMap(codeToSecurityProtocol);
+        NAMES = Collections.unmodifiableList(names);
+    }
+
+    /** The permanent and immutable id of a security protocol -- this can't 
change, and must match kafka.cluster.SecurityProtocol  */
+    public final short id;
+
+    /** Name of the security protocol. This may be used by client 
configuration. */
+    public final String name;
+
+    SecurityProtocol(int id, String name) {
+        this.id = (short) id;
+        this.name = name;
+    }
+
+    public static List<String> names() {
+        return NAMES;
+    }
+
+    public static SecurityProtocol forId(short id) {
+        return CODE_TO_SECURITY_PROTOCOL.get(id);
+    }
+
+    /** Case insensitive lookup by protocol name */
+    public static SecurityProtocol forName(String name) {
+        return SecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
index 325c282..d87a892 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.common.security.auth;
 
-import org.apache.kafka.common.protocol.SecurityProtocol;
-
 import javax.net.ssl.SSLSession;
 import java.net.InetAddress;
 
@@ -35,8 +33,8 @@ public class SslAuthenticationContext implements 
AuthenticationContext {
     }
 
     @Override
-    public String securityProtocolName() {
-        return SecurityProtocol.SSL.name;
+    public SecurityProtocol securityProtocol() {
+        return SecurityProtocol.SSL;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
index 18862be..643f859 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AbstractLogin.java
@@ -27,7 +27,6 @@ import 
javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.auth.Subject;
 
 import org.apache.kafka.common.security.JaasContext;
-import org.apache.kafka.common.security.auth.Login;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/authenticator/AuthCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/AuthCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AuthCallbackHandler.java
new file mode 100644
index 0000000..d517162
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/AuthCallbackHandler.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import java.util.Map;
+
+import org.apache.kafka.common.network.Mode;
+
+import javax.security.auth.Subject;
+import javax.security.auth.callback.CallbackHandler;
+
+/*
+ * Callback handler for SASL-based authentication
+ */
+public interface AuthCallbackHandler extends CallbackHandler {
+
+    /**
+     * Configures this callback handler.
+     *
+     * @param configs Configuration
+     * @param mode The mode that indicates if this is a client or server 
connection
+     * @param subject Subject from login context
+     * @param saslMechanism Negotiated SASL mechanism
+     */
+    void configure(Map<String, ?> configs, Mode mode, Subject subject, String 
saslMechanism);
+
+    /**
+     * Closes this instance.
+     */
+    void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/authenticator/Login.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/Login.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/Login.java
new file mode 100644
index 0000000..b41d1b2
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/Login.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.authenticator;
+
+import org.apache.kafka.common.security.JaasContext;
+
+import java.util.Map;
+
+import javax.security.auth.Subject;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+
+/**
+ * Login interface for authentication.
+ */
+public interface Login {
+
+    /**
+     * Configures this login instance.
+     */
+    void configure(Map<String, ?> configs, JaasContext jaasContext);
+
+    /**
+     * Performs login for each login module specified for the login context of 
this instance.
+     */
+    LoginContext login() throws LoginException;
+
+    /**
+     * Returns the authenticated subject of this login context.
+     */
+    Subject subject();
+
+    /**
+     * Returns the service name to be used for SASL.
+     */
+    String serviceName();
+
+    /**
+     * Closes this instance.
+     */
+    void close();
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
index 66d5e3b..a576e37 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.security.JaasContext;
-import org.apache.kafka.common.security.auth.Login;
 import org.apache.kafka.common.security.kerberos.KerberosLogin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index d9e4f0c..b01ae4c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -41,7 +41,6 @@ import 
org.apache.kafka.common.requests.SaslAuthenticateRequest;
 import org.apache.kafka.common.requests.SaslAuthenticateResponse;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 7111bad..4756387 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -28,7 +28,6 @@ import javax.security.sasl.RealmCallback;
 
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL 
mechanism

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 739e0cd..355e365 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -34,7 +34,7 @@ import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
@@ -46,7 +46,6 @@ import 
org.apache.kafka.common.requests.SaslAuthenticateResponse;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
 import org.apache.kafka.common.security.JaasContext;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.SaslAuthenticationContext;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
index 72e06a2..7d5372d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerCallbackHandler.java
@@ -20,7 +20,6 @@ import java.io.IOException;
 import java.util.Map;
 
 import org.apache.kafka.common.security.JaasContext;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
index f81c7f1..d3b245d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/scram/ScramServerCallbackHandler.java
@@ -25,7 +25,7 @@ import javax.security.auth.callback.NameCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 
 import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import org.apache.kafka.common.security.authenticator.AuthCallbackHandler;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 
 public class ScramServerCallbackHandler implements AuthCallbackHandler {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
index abcc07a..aa7a15e 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.network;
 
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.ssl.SslFactory;
 
 import javax.net.ssl.SSLContext;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
index a4ce66c..9518315 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java
@@ -24,7 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.utils.MockTime;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 8d510f5..ad587b9 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredentialUtils;
 import org.apache.kafka.common.security.scram.ScramMechanism;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 275104a..750fd01 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.authenticator.TestJaasConfig;
 import org.apache.kafka.common.security.plain.PlainLoginModule;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index be4dbc7..e3d1831 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 35f1377..f6af817 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -19,7 +19,7 @@ package org.apache.kafka.common.network;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.memory.SimpleMemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 440140b..6229eb6 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -22,7 +22,7 @@ import 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.security.TestSecurityConfig;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.LogContext;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index 7679711..baf0faf 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e96b188..edd1314 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -36,7 +36,7 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.MemoryRecords;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index fdf9e3c..a30c09f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.security.auth;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.TransportLayer;
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.security.kerberos.KerberosName;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
index d878b72..7c028c4 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/ClientAuthenticationFailureTest.java
@@ -30,8 +30,8 @@ import 
org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.NetworkTestUtils;
 import org.apache.kafka.common.network.NioEchoServer;
-import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index c2c8d81..3b9e32b 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -36,7 +36,7 @@ import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.ApiVersionsRequest;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index 4abff84..022a099 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -22,7 +22,7 @@ import 
org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.JaasContext;

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 21f23b7..2120657 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -35,8 +35,9 @@ import 
org.apache.kafka.common.errors.BrokerNotAvailableException
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala 
b/core/src/main/scala/kafka/client/ClientUtils.scala
index 3a2806f..5573256 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -16,7 +16,7 @@
  */
 package kafka.client
 
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.Errors
 
 import scala.collection._
 import kafka.cluster._
@@ -31,6 +31,8 @@ import kafka.network.BlockingChannel
 import kafka.utils.ZkUtils
 import java.io.IOException
 
+import org.apache.kafka.common.security.auth.SecurityProtocol
+
  /**
  * Helper functions common to clients (producer, consumer, or admin)
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala 
b/core/src/main/scala/kafka/cluster/Broker.scala
index 974e973..a148dfd 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -21,7 +21,7 @@ import kafka.common.{BrokerEndPointNotAvailableException, 
BrokerNotAvailableExce
 import kafka.utils.Json
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Time
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/cluster/EndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala 
b/core/src/main/scala/kafka/cluster/EndPoint.scala
index b3fc748..57ef0da 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -19,7 +19,7 @@ package kafka.cluster
 
 import kafka.common.KafkaException
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.Map

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 58e5543..5ac85cc 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -29,10 +29,11 @@ import kafka.utils._
 import org.apache.kafka.clients._
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network._
-import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index fa792fb..bea8f79 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -35,8 +35,8 @@ import org.apache.kafka.common.memory.{MemoryPool, 
SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Meter
 import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, 
KafkaChannel, ListenerName, Selectable, Send, Selector => KSelector}
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time}
 
 import scala.collection._

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index ea0c124..82efaba 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -33,8 +33,8 @@ import 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SaslConfigs, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.security.auth.SecurityProtocol
 
 import scala.collection.JavaConverters._
 import scala.collection.Map

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 43c81ab..0edc07a 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -27,7 +27,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import com.yammer.metrics.core.Gauge
 import org.I0Itec.zkclient.IZkStateListener
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
 import scala.collection.mutable.Set

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 
b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 0261254..20f1db2 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -22,8 +22,8 @@ import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
 import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 825ee89..789a48b 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -25,12 +25,11 @@ import java.lang.management._
 import java.util.{Properties, UUID}
 import javax.management._
 
-import org.apache.kafka.common.protocol.SecurityProtocol
-
 import scala.collection._
 import scala.collection.mutable
 import kafka.cluster.EndPoint
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils}
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 18a73b9..728e958 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -28,7 +28,6 @@ import kafka.network.SocketServer
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
-
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
@@ -36,15 +35,15 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType 
=> RResourceType, _}
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, 
AclPermissionType}
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, SimpleRecord}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
SimpleRecord}
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.resource.{ResourceFilter, Resource => 
AdminResource, ResourceType => AdminResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.{Node, TopicPartition, requests}
 import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 1369136..eadb488 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -30,8 +30,8 @@ import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala 
b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
index 7ec2feb..fce75b0 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala
@@ -19,7 +19,7 @@ package kafka.api
 
 import java.util.concurrent.ExecutionException
 import java.util.concurrent.atomic.AtomicReference
-import java.util.{Properties}
+import java.util.Properties
 
 import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
@@ -100,8 +100,8 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
   val topic = "e2etopic"
   val part = 0
   val tp = new TopicPartition(topic, part)
-  val topicAndPartition = new TopicAndPartition(topic, part)
-  this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, 
"kafka.api.EndToEndClusterIdTest$MockBrokerMetricsReporter")
+  val topicAndPartition = TopicAndPartition(topic, part)
+  this.serverConfig.setProperty(KafkaConfig.MetricReporterClassesProp, 
classOf[MockBrokerMetricsReporter].getName)
 
   override def generateConfigs = {
     val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, 
interBrokerSecurityProtocol = Some(securityProtocol),
@@ -112,7 +112,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
 
   @Before
   override def setUp() {
-    super.setUp
+    super.setUp()
     MockDeserializer.resetStaticVariables
     // create the consumer offset topic
     TestUtils.createTopic(this.zkUtils, topic, 2, serverCount, this.servers)
@@ -129,9 +129,9 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
 
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockProducerInterceptor")
+    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
classOf[MockProducerInterceptor].getName)
     producerProps.put("mock.interceptor.append", appendStr)
-    producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"kafka.api.EndToEndClusterIdTest$MockProducerMetricsReporter")
+    producerProps.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
classOf[MockProducerMetricsReporter].getName)
     val testProducer = new KafkaProducer(producerProps, new MockSerializer, 
new MockSerializer)
 
     // Send one record and make sure clusterId is set after send and before 
onAcknowledgement
@@ -150,8 +150,8 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {
     isValidClusterId(MockProducerMetricsReporter.CLUSTER_META.get.clusterId)
 
     this.consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList)
-    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
-    this.consumerConfig.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"kafka.api.EndToEndClusterIdTest$MockConsumerMetricsReporter")
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
classOf[MockConsumerInterceptor].getName)
+    this.consumerConfig.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, 
classOf[MockConsumerMetricsReporter].getName)
     val testConsumer = new KafkaConsumer(this.consumerConfig, new 
MockDeserializer, new MockDeserializer)
     testConsumer.assign(List(tp).asJava)
     testConsumer.seek(tp, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
index fd588de..2049e0a 100644
--- 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -18,7 +18,6 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Test
 import org.junit.Assert._
 
@@ -27,6 +26,7 @@ import java.util.Properties
 
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.security.auth.SecurityProtocol
 
 class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
   val offsetsTopicCompressionCodec = CompressionType.GZIP

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala 
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index f1bedfd..26022be 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -15,7 +15,6 @@ package kafka.api
 import java.util.{Locale, Properties}
 
 import kafka.log.LogConfig
-import kafka.network.RequestMetrics
 import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.utils.{JaasTestUtils, TestUtils}
 import com.yammer.metrics.Metrics
@@ -25,7 +24,7 @@ import org.apache.kafka.common.{Metric, MetricName, 
TopicPartition}
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index c1b26f1..aad9b6a 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -21,7 +21,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
-import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
+import org.apache.kafka.common.{MetricName, TopicPartition}
 import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.header.Headers
 import org.apache.kafka.common.record.{CompressionType, TimestampType}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
index 2aeccb4..6279340 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala
@@ -18,8 +18,7 @@ package kafka.api
 
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, PlaintextAuthenticationContext}
+import org.apache.kafka.common.security.auth._
 import org.junit.Before
 
 // This test case uses a separate listener for client and inter-broker 
communication, from

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index 23b78b0..8cbdd93 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -18,8 +18,8 @@
 package kafka.api.test
 
 import java.util.{Collection, Collections, Properties}
-import scala.collection.JavaConverters._
 
+import scala.collection.JavaConverters._
 import org.junit.runners.Parameterized
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized.Parameters
@@ -30,7 +30,7 @@ import kafka.server.{KafkaConfig, KafkaServer}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.serialization.ByteArraySerializer
 
 @RunWith(value = classOf[Parameterized])

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 8765040..b309b80 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -14,22 +14,21 @@ package kafka.api
 
 import java.io.FileOutputStream
 import java.util.Collections
-import java.util.concurrent.{ExecutionException, Future, TimeUnit}
-import scala.collection.JavaConverters._
+import java.util.concurrent.{ExecutionException, TimeUnit}
 
+import scala.collection.JavaConverters._
 import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig}
-import org.apache.kafka.clients.consumer.{KafkaConsumer, ConsumerConfig}
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord, RecordMetadata}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.SaslAuthenticationException
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
-
 import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, 
KafkaConsumerGroupService}
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils, ZkUtils}
+import org.apache.kafka.common.security.auth.SecurityProtocol
 
 class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness 
with SaslSetup {
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
index cbe882d..a366b1d 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala
@@ -20,9 +20,9 @@ import java.util.Properties
 
 import kafka.utils.TestUtils
 import kafka.utils.Implicits._
-import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.GroupAuthorizationException
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{Before, Test}
 
 import scala.collection.immutable.List

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
index 4206616..6ae99df 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala
@@ -14,10 +14,10 @@ package kafka.api
 
 import java.io.File
 
-import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
 import org.junit.{After, Before, Test}
 import kafka.utils.{JaasTestUtils, TestUtils}
+import org.apache.kafka.common.security.auth.SecurityProtocol
 
 import scala.collection.JavaConverters._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
index 34d0ebd..ea306a8 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala
@@ -15,10 +15,10 @@ package kafka.api
 import java.io.File
 import java.util.Locale
 
-import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
 import kafka.utils.{JaasTestUtils, TestUtils}
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before, Test}
 
 class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
index 5eca9c8..f877e68 100644
--- a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala
@@ -13,7 +13,7 @@
 package kafka.api
 
 import kafka.utils.JaasTestUtils
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before}
 
 class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslSetup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
index ed1c77b..2f50706 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslScramSslEndToEndAuthorizationTest.scala
@@ -18,7 +18,6 @@ package kafka.api
 
 import org.apache.kafka.common.security.scram.ScramMechanism
 import kafka.utils.JaasTestUtils
-import kafka.admin.ConfigCommand
 import kafka.utils.ZkUtils
 import scala.collection.JavaConverters._
 import org.junit.Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index c2b5993..a8de53f 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -15,14 +15,13 @@ package kafka.api
 import java.io.File
 
 import kafka.security.auth.{All, Allow, Alter, AlterConfigs, Authorizer, 
ClusterAction, Create, Delete, Deny, Describe, Operation, PermissionType, 
SimpleAclAuthorizer, Topic, Acl => AuthAcl, Resource => AuthResource}
-import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, JaasTestUtils, TestUtils}
 import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, 
DeleteAclsOptions}
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, 
AclPermissionType}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
InvalidRequestException}
 import org.apache.kafka.common.resource.{Resource, ResourceFilter, 
ResourceType}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.junit.Assert.assertEquals
 import org.junit.{After, Assert, Before, Test}
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
index 450ea3e..a7f749d 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala
@@ -14,9 +14,9 @@ package kafka.api
 
 import java.io.File
 
-import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.server.KafkaConfig
 import kafka.utils.JaasTestUtils
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.{After, Before}
 
 class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {

http://git-wip-us.apache.org/repos/asf/kafka/blob/5383f9be/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
index 1d13d88..a09fcdc 100644
--- a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala
@@ -14,7 +14,7 @@ package kafka.api
 
 import java.io.File
 
-import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.security.auth.SecurityProtocol
 
 class SslConsumerTest extends BaseConsumerTest {
   override protected def securityProtocol = SecurityProtocol.SSL

Reply via email to