KAFKA-4565; Separation of Internal and External traffic (KIP-103)

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Gwen Shapira <csh...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #2354 from ijuma/kafka-4565-separation-of-internal-and-external-traffic


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2567188
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2567188
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2567188

Branch: refs/heads/trunk
Commit: d25671884bbbdf7843ada3e7797573a00ac7cd56
Parents: 80d6c64
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Fri Jan 13 10:00:06 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Jan 13 10:00:06 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientUtils.java   |   2 +-
 .../kafka/common/network/ListenerName.java      |  68 ++++++++++
 .../apache/kafka/common/protocol/Protocol.java  |  26 +++-
 .../common/requests/UpdateMetadataRequest.java  |  61 +++++----
 .../common/requests/RequestResponseTest.java    |  32 +++--
 config/server.properties                        |   5 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |  32 +++--
 .../kafka/admin/ConsumerGroupCommand.scala      |   3 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  12 +-
 .../main/scala/kafka/client/ClientUtils.scala   |  67 ++++++----
 core/src/main/scala/kafka/cluster/Broker.scala  | 106 +++++++++++----
 .../src/main/scala/kafka/cluster/EndPoint.scala |  59 ++++----
 .../kafka/consumer/ConsumerFetcherManager.scala |   2 +-
 .../controller/ControllerChannelManager.scala   |  32 +++--
 .../kafka/controller/KafkaController.scala      |  12 +-
 .../scala/kafka/network/RequestChannel.scala    |  24 ++--
 .../main/scala/kafka/network/SocketServer.scala |  35 +++--
 .../src/main/scala/kafka/server/KafkaApis.scala |  15 ++-
 .../main/scala/kafka/server/KafkaConfig.scala   | 133 ++++++++++++-------
 .../scala/kafka/server/KafkaHealthcheck.scala   |  16 ++-
 .../main/scala/kafka/server/KafkaServer.scala   |  15 ++-
 .../main/scala/kafka/server/MetadataCache.scala |  43 +++---
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 .../kafka/tools/ConsumerOffsetChecker.scala     |   3 +-
 .../scala/kafka/tools/UpdateOffsetsInZK.scala   |  14 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala |  24 +++-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  42 ++----
 .../kafka/api/AuthorizerIntegrationTest.scala   |   7 +-
 .../kafka/api/ProducerBounceTest.scala          |   2 +-
 .../kafka/api/ProducerCompressionTest.scala     |   2 +-
 ...eListenersWithSameSecurityProtocolTest.scala | 124 +++++++++++++++++
 .../unit/kafka/admin/AddPartitionsTest.scala    |  27 ++--
 .../admin/ReassignPartitionsCommandTest.scala   |   6 +-
 .../api/RequestResponseSerializationTest.scala  |  24 ++--
 .../unit/kafka/cluster/BrokerEndPointTest.scala |  58 ++++----
 .../integration/BaseTopicMetadataTest.scala     |  41 +++---
 .../unit/kafka/integration/FetcherTest.scala    |   4 +-
 .../integration/KafkaServerTestHarness.scala    |   7 +
 .../ProducerConsumerTestHarness.scala           |   2 +-
 .../unit/kafka/network/SocketServerTest.scala   |  12 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |   3 +-
 .../unit/kafka/producer/ProducerTest.scala      |   4 +-
 .../unit/kafka/producer/SyncProducerTest.scala  |  18 +--
 .../unit/kafka/server/AdvertiseBrokerTest.scala |  78 ++++++++---
 .../unit/kafka/server/BaseRequestTest.scala     |   3 +-
 .../ControlledShutdownLeaderSelectorTest.scala  |   2 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala |   3 +-
 .../unit/kafka/server/KafkaConfigTest.scala     | 131 +++++++++++++++---
 .../unit/kafka/server/LeaderElectionTest.scala  |   8 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   2 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |  50 ++++---
 .../unit/kafka/server/OffsetCommitTest.scala    |   2 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |   6 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |   2 +-
 .../unit/kafka/server/ServerStartupTest.scala   |   2 +-
 .../server/SessionExpireListenerTest.scala      |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  27 +++-
 .../integration/utils/KafkaEmbedded.java        |   3 +-
 58 files changed, 1049 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index a0f5fab..3d54515 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -35,7 +35,7 @@ public class ClientUtils {
     private static final Logger log = 
LoggerFactory.getLogger(ClientUtils.class);
 
     public static List<InetSocketAddress> 
parseAndValidateAddresses(List<String> urls) {
-        List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+        List<InetSocketAddress> addresses = new ArrayList<>();
         for (String url : urls) {
             if (url != null && !url.isEmpty()) {
                 try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java 
b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
new file mode 100644
index 0000000..b376514
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerName.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import org.apache.kafka.common.protocol.SecurityProtocol;
+
+import java.util.Locale;
+import java.util.Objects;
+
+public final class ListenerName {
+
+    /**
+     * Create an instance with the security protocol name as the value.
+     */
+    public static ListenerName forSecurityProtocol(SecurityProtocol 
securityProtocol) {
+        return new ListenerName(securityProtocol.name);
+    }
+
+    /**
+     * Create an instance with the provided value converted to uppercase.
+     */
+    public static ListenerName normalised(String value) {
+        return new ListenerName(value.toUpperCase(Locale.ROOT));
+    }
+
+    private final String value;
+
+    public ListenerName(String value) {
+        Objects.requireNonNull("value should not be null");
+        this.value = value;
+    }
+
+    public String value() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ListenerName))
+            return false;
+        ListenerName that = (ListenerName) o;
+        return value.equals(that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return value.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "ListenerName(" + value + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 6c94f6f..4946960 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -835,9 +835,31 @@ public class Protocol {
 
     public static final Schema UPDATE_METADATA_RESPONSE_V2 = 
UPDATE_METADATA_RESPONSE_V1;
 
+    public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V3 = 
UPDATE_METADATA_REQUEST_PARTITION_STATE_V2;
 
-    public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] 
{UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1, 
UPDATE_METADATA_REQUEST_V2};
-    public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] 
{UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1, 
UPDATE_METADATA_RESPONSE_V2};
+    public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V3 =
+            new Schema(new Field("port", INT32, "The port on which the broker 
accepts requests."),
+                    new Field("host", STRING, "The hostname of the broker."),
+                    new Field("listener_name", STRING, "The listener name."),
+                    new Field("security_protocol_type", INT16, "The security 
protocol type."));
+
+    public static final Schema UPDATE_METADATA_REQUEST_BROKER_V3 =
+            new Schema(new Field("id", INT32, "The broker id."),
+                    new Field("end_points", new 
ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V3)),
+                    new Field("rack", NULLABLE_STRING, "The rack"));
+
+    public static final Schema UPDATE_METADATA_REQUEST_V3 =
+            new Schema(new Field("controller_id", INT32, "The controller id."),
+                    new Field("controller_epoch", INT32, "The controller 
epoch."),
+                    new Field("partition_states", new 
ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V3)),
+                    new Field("live_brokers", new 
ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V3)));
+
+    public static final Schema UPDATE_METADATA_RESPONSE_V3 = 
UPDATE_METADATA_RESPONSE_V2;
+
+    public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] 
{UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1,
+        UPDATE_METADATA_REQUEST_V2, UPDATE_METADATA_REQUEST_V3};
+    public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] 
{UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1,
+        UPDATE_METADATA_RESPONSE_V2, UPDATE_METADATA_RESPONSE_V3};
 
     /* SASL handshake api */
     public static final Schema SASL_HANDSHAKE_REQUEST_V0 = new Schema(

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 5fd682c..95e5683 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -15,6 +15,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -52,10 +53,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
             short version = version();
             if (version == 0) {
                 for (Broker broker : liveBrokers) {
-                    if ((broker.endPoints.get(SecurityProtocol.PLAINTEXT) == 
null)
-                            || (broker.endPoints.size() != 1)) {
-                        throw new 
UnsupportedVersionException("UpdateMetadataRequest v0 only " +
-                                "handles PLAINTEXT endpoints");
+                    if (broker.endPoints.size() != 1 || 
broker.endPoints.get(0).securityProtocol != SecurityProtocol.PLAINTEXT) {
+                        throw new 
UnsupportedVersionException("UpdateMetadataRequest v0 only handles PLAINTEXT 
endpoints");
                     }
                 }
             }
@@ -77,25 +76,20 @@ public class UpdateMetadataRequest extends AbstractRequest {
 
     public static final class Broker {
         public final int id;
-        public final Map<SecurityProtocol, EndPoint> endPoints;
-        public final String rack;
+        public final List<EndPoint> endPoints;
+        public final String rack; // introduced in V2
 
-        public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints, 
String rack) {
+        public Broker(int id, List<EndPoint> endPoints, String rack) {
             this.id = id;
             this.endPoints = endPoints;
             this.rack = rack;
         }
 
-        @Deprecated
-        public Broker(int id, Map<SecurityProtocol, EndPoint> endPoints) {
-            this(id, endPoints, null);
-        }
-
         @Override
         public String toString() {
             StringBuilder bld = new StringBuilder();
             bld.append("(id=").append(id);
-            bld.append(", endPoints=").append(Utils.mkString(endPoints));
+            bld.append(", endPoints=").append(Utils.join(endPoints, ","));
             bld.append(", rack=").append(rack);
             bld.append(")");
             return bld.toString();
@@ -105,15 +99,20 @@ public class UpdateMetadataRequest extends AbstractRequest 
{
     public static final class EndPoint {
         public final String host;
         public final int port;
+        public final SecurityProtocol securityProtocol;
+        public final ListenerName listenerName; // introduced in V3
 
-        public EndPoint(String host, int port) {
+        public EndPoint(String host, int port, SecurityProtocol 
securityProtocol, ListenerName listenerName) {
             this.host = host;
             this.port = port;
+            this.securityProtocol = securityProtocol;
+            this.listenerName = listenerName;
         }
 
         @Override
         public String toString() {
-            return "(host=" + host + ", port=" + port + ")";
+            return "(host=" + host + ", port=" + port + ", listenerName=" + 
listenerName +
+                    ", securityProtocol=" + securityProtocol + ")";
         }
     }
 
@@ -139,6 +138,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     // EndPoint key names
     private static final String HOST_KEY_NAME = "host";
     private static final String PORT_KEY_NAME = "port";
+    private static final String LISTENER_NAME_KEY_NAME = "listener_name";
     private static final String SECURITY_PROTOCOL_TYPE_KEY_NAME = 
"security_protocol_type";
 
     private final int controllerId;
@@ -175,16 +175,18 @@ public class UpdateMetadataRequest extends 
AbstractRequest {
             brokerData.set(BROKER_ID_KEY_NAME, broker.id);
 
             if (version == 0) {
-                EndPoint endPoint = 
broker.endPoints.get(SecurityProtocol.PLAINTEXT);
+                EndPoint endPoint = broker.endPoints.get(0);
                 brokerData.set(HOST_KEY_NAME, endPoint.host);
                 brokerData.set(PORT_KEY_NAME, endPoint.port);
             } else {
                 List<Struct> endPointsData = new 
ArrayList<>(broker.endPoints.size());
-                for (Map.Entry<SecurityProtocol, EndPoint> entry : 
broker.endPoints.entrySet()) {
+                for (EndPoint endPoint : broker.endPoints) {
                     Struct endPointData = 
brokerData.instance(ENDPOINTS_KEY_NAME);
-                    endPointData.set(PORT_KEY_NAME, entry.getValue().port);
-                    endPointData.set(HOST_KEY_NAME, entry.getValue().host);
-                    endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, 
entry.getKey().id);
+                    endPointData.set(PORT_KEY_NAME, endPoint.port);
+                    endPointData.set(HOST_KEY_NAME, endPoint.host);
+                    endPointData.set(SECURITY_PROTOCOL_TYPE_KEY_NAME, 
endPoint.securityProtocol.id);
+                    if (version >= 3)
+                        endPointData.set(LISTENER_NAME_KEY_NAME, 
endPoint.listenerName.value());
                     endPointsData.add(endPointData);
 
                 }
@@ -242,17 +244,24 @@ public class UpdateMetadataRequest extends 
AbstractRequest {
             if (brokerData.hasField(HOST_KEY_NAME)) {
                 String host = brokerData.getString(HOST_KEY_NAME);
                 int port = brokerData.getInt(PORT_KEY_NAME);
-                Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>(1);
-                endPoints.put(SecurityProtocol.PLAINTEXT, new EndPoint(host, 
port));
+                List<EndPoint> endPoints = new ArrayList<>(1);
+                SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
+                endPoints.add(new EndPoint(host, port, securityProtocol, 
ListenerName.forSecurityProtocol(securityProtocol)));
                 liveBrokers.add(new Broker(brokerId, endPoints, null));
-            } else { // V1 or V2
-                Map<SecurityProtocol, EndPoint> endPoints = new HashMap<>();
+            } else { // V1, V2 or V3
+                List<EndPoint> endPoints = new ArrayList<>();
                 for (Object endPointDataObj : 
brokerData.getArray(ENDPOINTS_KEY_NAME)) {
                     Struct endPointData = (Struct) endPointDataObj;
                     int port = endPointData.getInt(PORT_KEY_NAME);
                     String host = endPointData.getString(HOST_KEY_NAME);
                     short protocolTypeId = 
endPointData.getShort(SECURITY_PROTOCOL_TYPE_KEY_NAME);
-                    endPoints.put(SecurityProtocol.forId(protocolTypeId), new 
EndPoint(host, port));
+                    SecurityProtocol securityProtocol = 
SecurityProtocol.forId(protocolTypeId);
+                    String listenerName;
+                    if (endPointData.hasField(LISTENER_NAME_KEY_NAME)) // V3
+                        listenerName = 
endPointData.getString(LISTENER_NAME_KEY_NAME);
+                    else
+                        listenerName = securityProtocol.name;
+                    endPoints.add(new EndPoint(host, port, securityProtocol, 
new ListenerName(listenerName)));
                 }
                 String rack = null;
                 if (brokerData.hasField(RACK_KEY_NAME)) { // V2
@@ -270,7 +279,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         short versionId = version();
-        if (versionId <= 2)
+        if (versionId <= 3)
             return new UpdateMetadataResponse(Errors.forException(e).code());
         else
             throw new IllegalArgumentException(String.format("Version %d is 
not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index f02133d..de676c7 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,6 +16,7 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -99,10 +100,6 @@ public class RequestResponseTest {
         checkSerialization(createStopReplicaRequest(false));
         checkSerialization(createStopReplicaRequest(true).getErrorResponse(new 
UnknownServerException()), null);
         checkSerialization(createStopReplicaResponse(), null);
-        checkSerialization(createUpdateMetadataRequest(2, "rack1"));
-        checkSerialization(createUpdateMetadataRequest(2, null));
-        checkSerialization(createUpdateMetadataRequest(2, 
"rack1").getErrorResponse(new UnknownServerException()), null);
-        checkSerialization(createUpdateMetadataResponse(), null);
         checkSerialization(createLeaderAndIsrRequest());
         checkSerialization(createLeaderAndIsrRequest().getErrorResponse(new 
UnknownServerException()), null);
         checkSerialization(createLeaderAndIsrResponse(), null);
@@ -132,6 +129,13 @@ public class RequestResponseTest {
         checkSerialization(createUpdateMetadataRequest(1, null), 1);
         checkSerialization(createUpdateMetadataRequest(1, "rack1"), 1);
         checkSerialization(createUpdateMetadataRequest(1, 
null).getErrorResponse(new UnknownServerException()), 1);
+        checkSerialization(createUpdateMetadataRequest(2, "rack1"));
+        checkSerialization(createUpdateMetadataRequest(2, null));
+        checkSerialization(createUpdateMetadataRequest(2, 
"rack1").getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createUpdateMetadataRequest(3, "rack1"));
+        checkSerialization(createUpdateMetadataRequest(3, null));
+        checkSerialization(createUpdateMetadataRequest(3, 
"rack1").getErrorResponse(new UnknownServerException()), null);
+        checkSerialization(createUpdateMetadataResponse(), null);
         checkSerialization(createListOffsetRequest(0), 0);
         checkSerialization(createListOffsetRequest(0).getErrorResponse(new 
UnknownServerException()), 0);
         checkSerialization(createListOffsetResponse(0), 0);
@@ -498,16 +502,24 @@ public class RequestResponseTest {
         partitionStates.put(new TopicPartition("topic20", 1),
                 new PartitionState(1, 0, 1, new ArrayList<>(isr), 2, new 
HashSet<>(replicas)));
 
-        Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints1 = new 
HashMap<>();
-        endPoints1.put(SecurityProtocol.PLAINTEXT, new 
UpdateMetadataRequest.EndPoint("host1", 1223));
+        SecurityProtocol plaintext = SecurityProtocol.PLAINTEXT;
+        List<UpdateMetadataRequest.EndPoint> endPoints1 = new ArrayList<>();
+        endPoints1.add(new UpdateMetadataRequest.EndPoint("host1", 1223, 
plaintext,
+                ListenerName.forSecurityProtocol(plaintext)));
 
-        Map<SecurityProtocol, UpdateMetadataRequest.EndPoint> endPoints2 = new 
HashMap<>();
-        endPoints2.put(SecurityProtocol.PLAINTEXT, new 
UpdateMetadataRequest.EndPoint("host1", 1244));
+        List<UpdateMetadataRequest.EndPoint> endPoints2 = new ArrayList<>();
+        endPoints2.add(new UpdateMetadataRequest.EndPoint("host1", 1244, 
plaintext,
+                ListenerName.forSecurityProtocol(plaintext)));
         if (version > 0) {
-            endPoints2.put(SecurityProtocol.SSL, new 
UpdateMetadataRequest.EndPoint("host2", 1234));
+            SecurityProtocol ssl = SecurityProtocol.SSL;
+            endPoints2.add(new UpdateMetadataRequest.EndPoint("host2", 1234, 
ssl,
+                    ListenerName.forSecurityProtocol(ssl)));
+            endPoints2.add(new UpdateMetadataRequest.EndPoint("host2", 1334, 
ssl,
+                    new ListenerName("CLIENT")));
         }
 
-        Set<UpdateMetadataRequest.Broker> liveBrokers = new 
HashSet<>(Arrays.asList(new UpdateMetadataRequest.Broker(0, endPoints1, rack),
+        Set<UpdateMetadataRequest.Broker> liveBrokers = new 
HashSet<>(Arrays.asList(
+                new UpdateMetadataRequest.Broker(0, endPoints1, rack),
                 new UpdateMetadataRequest.Broker(1, endPoints2, rack)
         ));
         return new UpdateMetadataRequest.Builder(1, 10, partitionStates, 
liveBrokers).

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 60479b2..506d0e7 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -28,7 +28,7 @@ broker.id=0
 # The address the socket server listens on. It will get the value returned 
from 
 # java.net.InetAddress.getCanonicalHostName() if not configured.
 #   FORMAT:
-#     listeners = security_protocol://host_name:port
+#     listeners = listener_name://host_name:port
 #   EXAMPLE:
 #     listeners = PLAINTEXT://your.host.name:9092
 #listeners=PLAINTEXT://:9092
@@ -38,6 +38,9 @@ broker.id=0
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092
 
+# Maps listener names to security protocols, the default is for them to be the 
same. See the config documentation for more details
+#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
+
 # The number of threads handling network requests
 num.network.threads=3
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 91cd426..5f5d20b 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -20,13 +20,15 @@ package kafka.admin
 import kafka.common._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
-import kafka.server.{DynamicConfig, ConfigEntityName, ConfigType}
+import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig}
 import kafka.utils._
 import kafka.utils.ZkUtils._
 import java.util.Random
 import java.util.Properties
+
 import org.apache.kafka.common.Node
-import org.apache.kafka.common.errors.{ReplicaNotAvailableException, 
UnknownTopicOrPartitionException, InvalidTopicException, 
LeaderNotAvailableException, InvalidPartitionsException, 
InvalidReplicationFactorException, TopicExistsException, 
InvalidReplicaAssignmentException}
+import org.apache.kafka.common.errors.{InvalidPartitionsException, 
InvalidReplicaAssignmentException, InvalidReplicationFactorException, 
InvalidTopicException, LeaderNotAvailableException, 
ReplicaNotAvailableException, TopicExistsException, 
UnknownTopicOrPartitionException}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.MetadataResponse
 
@@ -622,22 +624,26 @@ object AdminUtils extends Logging with AdminUtilities {
   }
 
   def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): 
MetadataResponse.TopicMetadata =
-    fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
+    fetchTopicMetadataFromZk(topic, zkUtils, mutable.Map.empty[Int, Broker],
+      ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
 
   def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): 
Set[MetadataResponse.TopicMetadata] =
-    fetchTopicMetadataFromZk(topics, zkUtils, SecurityProtocol.PLAINTEXT)
+    fetchTopicMetadataFromZk(topics, zkUtils, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+
+  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, 
securityProtocol: SecurityProtocol): Set[MetadataResponse.TopicMetadata] =
+    fetchTopicMetadataFromZk(topics, zkUtils, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
 
-  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, 
protocol: SecurityProtocol): Set[MetadataResponse.TopicMetadata] = {
-    val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
-    topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, 
cachedBrokerInfo, protocol))
+  def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils, 
listenerName: ListenerName): Set[MetadataResponse.TopicMetadata] = {
+    val cachedBrokerInfo = mutable.Map.empty[Int, Broker]
+    topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, 
cachedBrokerInfo, listenerName))
   }
 
   private def fetchTopicMetadataFromZk(topic: String,
                                        zkUtils: ZkUtils,
-                                       cachedBrokerInfo: mutable.HashMap[Int, 
Broker],
-                                       protocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT): MetadataResponse.TopicMetadata = {
+                                       cachedBrokerInfo: mutable.Map[Int, 
Broker],
+                                       listenerName: ListenerName): 
MetadataResponse.TopicMetadata = {
     if (zkUtils.pathExists(getTopicPath(topic))) {
-      val topicPartitionAssignment = 
zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get
+      val topicPartitionAssignment = 
zkUtils.getPartitionAssignmentForTopics(List(topic))(topic)
       val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) 
=> m1._1 < m2._1)
       val partitionMetadata = sortedPartitions.map { partitionMap =>
         val partition = partitionMap._1
@@ -653,15 +659,15 @@ object AdminUtils extends Logging with AdminUtilities {
           leaderInfo = leader match {
             case Some(l) =>
               try {
-                getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
List(l)).head.getNode(protocol)
+                getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
List(l)).head.getNode(listenerName)
               } catch {
                 case e: Throwable => throw new 
LeaderNotAvailableException("Leader not available for partition 
[%s,%d]".format(topic, partition), e)
               }
             case None => throw new LeaderNotAvailableException("No leader 
exists for partition " + partition)
           }
           try {
-            replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
replicas).map(_.getNode(protocol))
-            isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
inSyncReplicas).map(_.getNode(protocol))
+            replicaInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
replicas).map(_.getNode(listenerName))
+            isrInfo = getBrokerInfoFromCache(zkUtils, cachedBrokerInfo, 
inSyncReplicas).map(_.getNode(listenerName))
           } catch {
             case e: Throwable => throw new ReplicaNotAvailableException(e)
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index b53856e..a832c38 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.serialization.StringDeserializer
@@ -367,7 +368,7 @@ object ConsumerGroupCommand extends Logging {
     private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
       try {
         zkUtils.getBrokerInfo(brokerId)
-          .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+          
.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
           .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 
10000, 100000, "ConsumerGroupCommand"))
           .orElse(throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(brokerId)))
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala 
b/core/src/main/scala/kafka/api/ApiVersion.scala
index 4052639..4cd10f4 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -59,8 +59,10 @@ object ApiVersion {
     "0.10.1-IV1" -> KAFKA_0_10_1_IV1,
     // introduced ListOffsetRequest v1 in KIP-79
     "0.10.1-IV2" -> KAFKA_0_10_1_IV2,
-    "0.10.1" -> KAFKA_0_10_1_IV2
-
+    "0.10.1" -> KAFKA_0_10_1_IV2,
+    // introduced UpdateMetadataRequest v3 in KIP-103
+    "0.10.2-IV0" -> KAFKA_0_10_2_IV0,
+    "0.10.2" -> KAFKA_0_10_2_IV0
   )
 
   private val versionPattern = "\\.".r
@@ -138,3 +140,9 @@ case object KAFKA_0_10_1_IV2 extends ApiVersion {
   val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
   val id: Int = 8
 }
+
+case object KAFKA_0_10_2_IV0 extends ApiVersion {
+  val version: String = "0.10.2-IV0"
+  val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+  val id: Int = 9
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala 
b/core/src/main/scala/kafka/client/ClientUtils.scala
index 8893697..dbb8a76 100755
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -22,9 +22,10 @@ import scala.collection._
 import kafka.cluster._
 import kafka.api._
 import kafka.producer._
-import kafka.common.KafkaException
+import kafka.common.{BrokerEndPointNotAvailableException, KafkaException}
 import kafka.utils.{CoreUtils, Logging}
 import java.util.Properties
+
 import util.Random
 import kafka.network.BlockingChannel
 import kafka.utils.ZkUtils
@@ -105,34 +106,46 @@ object ClientUtils extends Logging{
     }
   }
 
+  /**
+   * Creates a blocking channel to a random broker
+   */
+  def channelToAnyBroker(zkUtils: ZkUtils, socketTimeoutMs: Int = 3000) : 
BlockingChannel = {
+    var channel: BlockingChannel = null
+    var connected = false
+    while (!connected) {
+      val allBrokers = getPlaintextBrokerEndPoints(zkUtils)
+      Random.shuffle(allBrokers).find { broker =>
+        trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
+        try {
+          channel = new BlockingChannel(broker.host, broker.port, 
BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, 
socketTimeoutMs)
+          channel.connect()
+          debug("Created channel to broker %s:%d.".format(channel.host, 
channel.port))
+          true
+        } catch {
+          case _: Exception =>
+            if (channel != null) channel.disconnect()
+            channel = null
+            info("Error while creating channel to %s:%d.".format(broker.host, 
broker.port))
+            false
+        }
+      }
+      connected = channel != null
+    }
+
+    channel
+  }
+
    /**
-    * Creates a blocking channel to a random broker
+    * Returns the first end point from each broker with the PLAINTEXT security 
protocol.
     */
-   def channelToAnyBroker(zkUtils: ZkUtils, socketTimeoutMs: Int = 3000) : 
BlockingChannel = {
-     var channel: BlockingChannel = null
-     var connected = false
-     while (!connected) {
-       val allBrokers = 
zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
-       Random.shuffle(allBrokers).find { broker =>
-         trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
-         try {
-           channel = new BlockingChannel(broker.host, broker.port, 
BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, 
socketTimeoutMs)
-           channel.connect()
-           debug("Created channel to broker %s:%d.".format(channel.host, 
channel.port))
-           true
-         } catch {
-           case _: Exception =>
-             if (channel != null) channel.disconnect()
-             channel = null
-             info("Error while creating channel to %s:%d.".format(broker.host, 
broker.port))
-             false
-         }
-       }
-       connected = channel != null
-     }
-
-     channel
-   }
+  def getPlaintextBrokerEndPoints(zkUtils: ZkUtils): Seq[BrokerEndPoint] = {
+    zkUtils.getAllBrokersInCluster().map { broker =>
+      broker.endPoints.collectFirst {
+        case endPoint if endPoint.securityProtocol == 
SecurityProtocol.PLAINTEXT =>
+          new BrokerEndPoint(broker.id, endPoint.host, endPoint.port)
+      }.getOrElse(throw new BrokerEndPointNotAvailableException(s"End point 
with security protocol PLAINTEXT not found for broker ${broker.id}"))
+    }
+  }
 
    /**
     * Creates a blocking channel to the offset manager of the given group

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/cluster/Broker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala 
b/core/src/main/scala/kafka/cluster/Broker.scala
index 7116722..00b4078 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -20,7 +20,9 @@ package kafka.cluster
 import kafka.common.{BrokerEndPointNotAvailableException, 
BrokerNotAvailableException, KafkaException}
 import kafka.utils.Json
 import org.apache.kafka.common.Node
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.utils.Time
 
 /**
  * A Kafka broker.
@@ -29,6 +31,15 @@ import org.apache.kafka.common.protocol.SecurityProtocol
  */
 object Broker {
 
+  private val HostKey = "host"
+  private val PortKey = "port"
+  private val VersionKey = "version"
+  private val EndpointsKey = "endpoints"
+  private val RackKey = "rack"
+  private val JmxPortKey = "jmx_port"
+  private val ListenerSecurityProtocolMapKey = "listener_security_protocol_map"
+  private val TimestampKey = "timestamp"
+
   /**
     * Create a broker object from id and JSON string.
     *
@@ -54,7 +65,7 @@ object Broker {
     *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"]
     * }
     *
-    * Version 3 (current) JSON schema for a broker is:
+    * Version 3 JSON schema for a broker is:
     * {
     *   "version":3,
     *   "host":"localhost",
@@ -64,6 +75,18 @@ object Broker {
     *   "endpoints":["PLAINTEXT://host1:9092", "SSL://host1:9093"],
     *   "rack":"dc1"
     * }
+    *
+    * Version 4 (current) JSON schema for a broker is:
+    * {
+    *   "version":4,
+    *   "host":"localhost",
+    *   "port":9092
+    *   "jmx_port":9999,
+    *   "timestamp":"2233345666",
+    *   "endpoints":["CLIENT://host1:9092", "REPLICATION://host1:9093"],
+    *   "listener_security_protocol_map":{"CLIENT":"SSL", 
"REPLICATION":"PLAINTEXT"}
+    *   "rack":"dc1"
+    * }
     */
   def createBroker(id: Int, brokerInfoString: String): Broker = {
     if (brokerInfoString == null)
@@ -72,24 +95,28 @@ object Broker {
       Json.parseFull(brokerInfoString) match {
         case Some(m) =>
           val brokerInfo = m.asInstanceOf[Map[String, Any]]
-          val version = brokerInfo("version").asInstanceOf[Int]
+          val version = brokerInfo(VersionKey).asInstanceOf[Int]
           val endpoints =
             if (version < 1)
               throw new KafkaException(s"Unsupported version of broker 
registration: $brokerInfoString")
             else if (version == 1) {
-              val host = brokerInfo("host").asInstanceOf[String]
-              val port = brokerInfo("port").asInstanceOf[Int]
-              Map(SecurityProtocol.PLAINTEXT -> new EndPoint(host, port, 
SecurityProtocol.PLAINTEXT))
+              val host = brokerInfo(HostKey).asInstanceOf[String]
+              val port = brokerInfo(PortKey).asInstanceOf[Int]
+              val securityProtocol = SecurityProtocol.PLAINTEXT
+              val endPoint = new EndPoint(host, port, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
+              Seq(endPoint)
             }
             else {
-              val listeners = 
brokerInfo("endpoints").asInstanceOf[List[String]]
-              listeners.map { listener =>
-                val ep = EndPoint.createEndPoint(listener)
-                (ep.protocolType, ep)
-              }.toMap
+              val securityProtocolMap = 
brokerInfo.get(ListenerSecurityProtocolMapKey).map(
+                _.asInstanceOf[Map[String, String]].map { case (listenerName, 
securityProtocol) =>
+                new ListenerName(listenerName) -> 
SecurityProtocol.forName(securityProtocol)
+              })
+              val listeners = 
brokerInfo(EndpointsKey).asInstanceOf[List[String]]
+              listeners.map(EndPoint.createEndPoint(_, securityProtocolMap))
             }
-          val rack = brokerInfo.get("rack").filter(_ != 
null).map(_.asInstanceOf[String])
-          new Broker(id, endpoints, rack)
+          val rack = brokerInfo.get(RackKey).filter(_ != 
null).map(_.asInstanceOf[String])
+
+          Broker(id, endpoints, rack)
         case None =>
           throw new BrokerNotAvailableException(s"Broker id $id does not 
exist")
       }
@@ -98,34 +125,57 @@ object Broker {
         throw new KafkaException(s"Failed to parse the broker info from 
zookeeper: $brokerInfoString", t)
     }
   }
-}
 
-case class Broker(id: Int, endPoints: collection.Map[SecurityProtocol, 
EndPoint], rack: Option[String]) {
+  def toJson(version: Int, id: Int, host: String, port: Int, 
advertisedEndpoints: Seq[EndPoint], jmxPort: Int,
+             rack: Option[String]): String = {
+    val jsonMap = collection.mutable.Map(VersionKey -> version,
+      HostKey -> host,
+      PortKey -> port,
+      EndpointsKey -> advertisedEndpoints.map(_.connectionString).toArray,
+      JmxPortKey -> jmxPort,
+      TimestampKey -> Time.SYSTEM.milliseconds().toString
+    )
+    rack.foreach(rack => if (version >= 3) jsonMap += (RackKey -> rack))
 
-  override def toString: String =
-    s"$id : ${endPoints.values.mkString("(",",",")")} : ${rack.orNull}"
+    if (version >= 4) {
+      jsonMap += (ListenerSecurityProtocolMapKey -> advertisedEndpoints.map { 
endPoint =>
+        endPoint.listenerName.value -> endPoint.securityProtocol.name
+      }.toMap)
+    }
 
-  def this(id: Int, endPoints: Map[SecurityProtocol, EndPoint]) = {
-    this(id, endPoints, None)
+    Json.encode(jsonMap)
   }
+}
+
+case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
+
+  private val endPointsMap = endPoints.map { endPoint =>
+    endPoint.listenerName -> endPoint
+  }.toMap
+
+  if (endPointsMap.size != endPoints.size)
+    throw new IllegalArgumentException(s"There is more than one end point with 
the same listener name: ${endPoints.mkString(",")}")
+
+  override def toString: String =
+    s"$id : ${endPointsMap.values.mkString("(",",",")")} : ${rack.orNull}"
 
-  def this(id: Int, host: String, port: Int, protocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT) = {
-    this(id, Map(protocol -> EndPoint(host, port, protocol)), None)
+  def this(id: Int, host: String, port: Int, listenerName: ListenerName, 
protocol: SecurityProtocol) = {
+    this(id, Seq(EndPoint(host, port, listenerName, protocol)), None)
   }
 
-  def this(bep: BrokerEndPoint, protocol: SecurityProtocol) = {
-    this(bep.id, bep.host, bep.port, protocol)
+  def this(bep: BrokerEndPoint, listenerName: ListenerName, protocol: 
SecurityProtocol) = {
+    this(bep.id, bep.host, bep.port, listenerName, protocol)
   }
 
-  def getNode(protocolType: SecurityProtocol): Node = {
-    val endpoint = endPoints.getOrElse(protocolType,
-      throw new BrokerEndPointNotAvailableException(s"End point with security 
protocol $protocolType not found for broker $id"))
+  def getNode(listenerName: ListenerName): Node = {
+    val endpoint = endPointsMap.getOrElse(listenerName,
+      throw new BrokerEndPointNotAvailableException(s"End point with protocol 
label $listenerName not found for broker $id"))
     new Node(id, endpoint.host, endpoint.port, rack.orNull)
   }
 
-  def getBrokerEndPoint(protocolType: SecurityProtocol): BrokerEndPoint = {
-    val endpoint = endPoints.getOrElse(protocolType,
-      throw new BrokerEndPointNotAvailableException(s"End point with security 
protocol $protocolType not found for broker $id"))
+  def getBrokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
+    val endpoint = endPointsMap.getOrElse(listenerName,
+      throw new BrokerEndPointNotAvailableException(s"End point with security 
protocol $listenerName not found for broker $id"))
     new BrokerEndPoint(id, endpoint.host, endpoint.port)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/cluster/EndPoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala 
b/core/src/main/scala/kafka/cluster/EndPoint.scala
index 720d819..b3fc748 100644
--- a/core/src/main/scala/kafka/cluster/EndPoint.scala
+++ b/core/src/main/scala/kafka/cluster/EndPoint.scala
@@ -17,37 +17,44 @@
 
 package kafka.cluster
 
-import java.nio.ByteBuffer
-
-import kafka.api.ApiUtils._
 import kafka.common.KafkaException
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.Map
+
 object EndPoint {
 
   private val uriParseExp = 
"""^(.*)://\[?([0-9a-zA-Z\-%._:]*)\]?:(-?[0-9]+)""".r
 
-  def readFrom(buffer: ByteBuffer): EndPoint = {
-    val port = buffer.getInt()
-    val host = readShortString(buffer)
-    val protocol = buffer.getShort()
-    EndPoint(host, port, SecurityProtocol.forId(protocol))
-  }
+  private[kafka] val DefaultSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] =
+    SecurityProtocol.values.map(sp => ListenerName.forSecurityProtocol(sp) -> 
sp).toMap
 
   /**
-   * Create EndPoint object from connectionString
-   * @param connectionString the format is protocol://host:port or 
protocol://[ipv6 host]:port
-   *                         for example: PLAINTEXT://myhost:9092 or 
PLAINTEXT://[::1]:9092
+   * Create EndPoint object from `connectionString` and optional 
`securityProtocolMap`. If the latter is not provided,
+   * we fallback to the default behaviour where listener names are the same as 
security protocols.
+   *
+   * @param connectionString the format is listener_name://host:port or 
listener_name://[ipv6 host]:port
+   *                         for example: PLAINTEXT://myhost:9092, 
CLIENT://myhost:9092 or REPLICATION://[::1]:9092
    *                         Host can be empty (PLAINTEXT://:9092) in which 
case we'll bind to default interface
    *                         Negative ports are also accepted, since they are 
used in some unit tests
-   * @return
    */
-  def createEndPoint(connectionString: String): EndPoint = {
+  def createEndPoint(connectionString: String, securityProtocolMap: 
Option[Map[ListenerName, SecurityProtocol]]): EndPoint = {
+    val protocolMap = securityProtocolMap.getOrElse(DefaultSecurityProtocolMap)
+
+    def securityProtocol(listenerName: ListenerName): SecurityProtocol =
+      protocolMap.getOrElse(listenerName,
+        throw new IllegalArgumentException(s"No security protocol defined for 
listener ${listenerName.value}"))
+
     connectionString match {
-      case uriParseExp(protocol, "", port) => new EndPoint(null, port.toInt, 
SecurityProtocol.forName(protocol))
-      case uriParseExp(protocol, host, port) => new EndPoint(host, port.toInt, 
SecurityProtocol.forName(protocol))
-      case _ => throw new KafkaException("Unable to parse " + connectionString 
+ " to a broker endpoint")
+      case uriParseExp(listenerNameString, "", port) =>
+        val listenerName = ListenerName.normalised(listenerNameString)
+        new EndPoint(null, port.toInt, listenerName, 
securityProtocol(listenerName))
+      case uriParseExp(listenerNameString, host, port) =>
+        val listenerName = ListenerName.normalised(listenerNameString)
+        new EndPoint(host, port.toInt, listenerName, 
securityProtocol(listenerName))
+      case _ => throw new KafkaException(s"Unable to parse $connectionString 
to a broker endpoint")
     }
   }
 }
@@ -55,25 +62,13 @@ object EndPoint {
 /**
  * Part of the broker definition - matching host/port pair to a protocol
  */
-case class EndPoint(host: String, port: Int, protocolType: SecurityProtocol) {
-
-  def connectionString(): String = {
+case class EndPoint(host: String, port: Int, listenerName: ListenerName, 
securityProtocol: SecurityProtocol) {
+  def connectionString: String = {
     val hostport =
       if (host == null)
         ":"+port
       else
         Utils.formatAddress(host, port)
-    protocolType + "://" + hostport
-  }
-
-  def writeTo(buffer: ByteBuffer): Unit = {
-    buffer.putInt(port)
-    writeShortString(buffer, host)
-    buffer.putShort(protocolType.id)
+    listenerName.value + "://" + hostport
   }
-
-  def sizeInBytes: Int =
-    4 + /* port */
-    shortStringLength(host) +
-    2 /* protocol id */
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index dcdeb1e..3b054e4 100755
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -64,7 +64,7 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
         }
 
         trace("Partitions without leader %s".format(noLeaderPartitionSet))
-        val brokers = 
zkUtils.getAllBrokerEndPointsForChannel(SecurityProtocol.PLAINTEXT)
+        val brokers = ClientUtils.getPlaintextBrokerEndPoints(zkUtils)
         val topicsMetadata = 
ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
                                                             brokers,
                                                             config.clientId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index e9ccf64..d928034 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -24,10 +24,10 @@ import kafka.cluster.Broker
 import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.clients.{ClientRequest, ClientResponse, 
ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, 
NetworkClient}
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, 
NetworkReceive, Selectable, Selector}
-import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, SecurityProtocol}
+import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, 
LoginType, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.EndPoint
@@ -89,7 +89,7 @@ class ControllerChannelManager(controllerContext: 
ControllerContext, config: Kaf
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[QueueItem]
     debug("Controller %d trying to connect to broker 
%d".format(config.brokerId, broker.id))
-    val brokerEndPoint = 
broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
+    val brokerEndPoint = 
broker.getBrokerEndPoint(config.interBrokerListenerName)
     val brokerNode = new Node(broker.id, brokerEndPoint.host, 
brokerEndPoint.port)
     val networkClient = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
@@ -353,7 +353,7 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
         }
         val leaderIds = 
partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => 
leaderIds.contains(b.id)).map {
-          _.getNode(controller.config.interBrokerSecurityProtocol)
+          _.getNode(controller.config.interBrokerListenerName)
         }
         val partitionStates = partitionStateInfos.map { case (topicPartition, 
partitionStateInfo) =>
           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = 
partitionStateInfo.leaderIsrAndControllerEpoch
@@ -379,29 +379,33 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController) extends  Logging
         topicPartition -> partitionState
       }
 
-      val version: Short = if (controller.config.interBrokerProtocolVersion >= 
KAFKA_0_10_0_IV1) 2: Short
-                    else if (controller.config.interBrokerProtocolVersion >= 
KAFKA_0_9_0) 1: Short
-                    else 0: Short
+      val version: Short =
+        if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_2_IV0) 3
+        else if (controller.config.interBrokerProtocolVersion >= 
KAFKA_0_10_0_IV1) 2
+        else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
+        else 0
 
       val updateMetadataRequest = {
         val liveBrokers = if (version == 0) {
           // Version 0 of UpdateMetadataRequest only supports PLAINTEXT.
           controllerContext.liveOrShuttingDownBrokers.map { broker =>
-            val node = broker.getNode(SecurityProtocol.PLAINTEXT)
-            val endPoints = Map(SecurityProtocol.PLAINTEXT -> new 
EndPoint(node.host(), node.port()))
+            val securityProtocol = SecurityProtocol.PLAINTEXT
+            val listenerName = 
ListenerName.forSecurityProtocol(securityProtocol)
+            val node = broker.getNode(listenerName)
+            val endPoints = Seq(new EndPoint(node.host, node.port, 
securityProtocol, listenerName))
             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, 
broker.rack.orNull)
           }
         } else {
           controllerContext.liveOrShuttingDownBrokers.map { broker =>
-            val endPoints = broker.endPoints.map { case (securityProtocol, 
endPoint) =>
-              securityProtocol -> new 
UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
+            val endPoints = broker.endPoints.map { endPoint =>
+              new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, 
endPoint.securityProtocol, endPoint.listenerName)
             }
             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, 
broker.rack.orNull)
           }
         }
         new UpdateMetadataRequest.Builder(
-            controllerId, controllerEpoch, partitionStates.asJava, 
liveBrokers.asJava).
-            setVersion(version)
+          controllerId, controllerEpoch, partitionStates.asJava, 
liveBrokers.asJava).
+          setVersion(version)
       }
 
       updateMetadataRequestBrokerSet.foreach { broker =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 83c6d01..8ffa610 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -210,12 +210,12 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
     }
   )
 
-  def epoch = controllerContext.epoch
+  def epoch: Int = controllerContext.epoch
 
-  def clientId = {
-    val listeners = config.listeners
-    val controllerListener = listeners.get(config.interBrokerSecurityProtocol)
-    "id_%d-host_%s-port_%d".format(config.brokerId, 
controllerListener.get.host, controllerListener.get.port)
+  def clientId: String = {
+    val controllerListener = config.listeners.find(_.listenerName == 
config.interBrokerListenerName).getOrElse(
+      throw new IllegalArgumentException(s"No listener with name 
${config.interBrokerListenerName} is configured."))
+    "id_%d-host_%s-port_%d".format(config.brokerId, controllerListener.host, 
controllerListener.port)
   }
 
   /**
@@ -226,7 +226,7 @@ class KafkaController(val config: KafkaConfig, zkUtils: 
ZkUtils, val brokerState
    * @param id Id of the broker to shutdown.
    * @return The number of partitions that the broker still leads.
    */
-  def shutdownBroker(id: Int) : Set[TopicAndPartition] = {
+  def shutdownBroker(id: Int): Set[TopicAndPartition] = {
 
     if (!isActive) {
       throw new ControllerMovedException("Controller moved to another broker. 
Aborting controlled shutdown")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index a511440..c063801 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -29,7 +29,7 @@ import kafka.server.QuotaId
 import kafka.utils.Logging
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidRequestException
-import org.apache.kafka.common.network.Send
+import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Protocol, SecurityProtocol}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests._
@@ -38,12 +38,14 @@ import org.apache.kafka.common.utils.Time
 import org.apache.log4j.Logger
 
 object RequestChannel extends Logging {
-  val AllDone = Request(processor = 1, connectionId = "2", 
Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost), buffer = 
getShutdownReceive(), startTimeMs = 0, securityProtocol = 
SecurityProtocol.PLAINTEXT)
+  val AllDone = Request(processor = 1, connectionId = "2", 
Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost),
+    buffer = getShutdownReceive, startTimeMs = 0, listenerName = new 
ListenerName(""),
+    securityProtocol = SecurityProtocol.PLAINTEXT)
   private val requestLogger = Logger.getLogger("kafka.request.logger")
 
-  def getShutdownReceive() = {
+  private def getShutdownReceive = {
     val emptyProduceRequest = new ProduceRequest.Builder(0, 0, new 
HashMap[TopicPartition, MemoryRecords]()).build()
-    val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, 
emptyProduceRequest.version(), "", 0)
+    val emptyRequestHeader = new RequestHeader(ApiKeys.PRODUCE.id, 
emptyProduceRequest.version, "", 0)
     AbstractRequestResponse.serialize(emptyRequestHeader, emptyProduceRequest)
   }
 
@@ -51,7 +53,8 @@ object RequestChannel extends Logging {
     val sanitizedUser = QuotaId.sanitize(principal.getName)
   }
 
-  case class Request(processor: Int, connectionId: String, session: Session, 
private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: 
SecurityProtocol) {
+  case class Request(processor: Int, connectionId: String, session: Session, 
private var buffer: ByteBuffer,
+                     startTimeMs: Long, listenerName: ListenerName, 
securityProtocol: SecurityProtocol) {
     // These need to be volatile because the readers are in the network thread 
and the writers are in the request
     // handler threads or the purgatory threads
     @volatile var requestDequeueTimeMs = -1L
@@ -149,12 +152,11 @@ object RequestChannel extends Logging {
         m.totalTimeHist.update(totalTime)
       }
 
-      if (requestLogger.isTraceEnabled)
-        requestLogger.trace("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
-          .format(requestDesc(true), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal))
-      else if (requestLogger.isDebugEnabled)
-        requestLogger.debug("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s"
-          .format(requestDesc(false), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal))
+      if (requestLogger.isDebugEnabled) {
+        val detailsEnabled = requestLogger.isTraceEnabled
+        requestLogger.trace("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d,securityProtocol:%s,principal:%s,listener:%s"
+          .format(requestDesc(detailsEnabled), connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime, securityProtocol, session.principal, listenerName.value))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index d8d0144..c0353d5 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -34,7 +34,7 @@ import kafka.server.KafkaConfig
 import kafka.utils._
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, 
Selectable, Selector => KSelector}
+import org.apache.kafka.common.network.{ChannelBuilders, KafkaChannel, 
ListenerName, LoginType, Mode, Selectable, Selector => KSelector}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -52,7 +52,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
  */
 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: 
Time, val credentialProvider: CredentialProvider) extends Logging with 
KafkaMetricsGroup {
 
-  private val endpoints = config.listeners
+  private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
   private val numProcessorThreads = config.numNetworkThreads
   private val maxQueuedRequests = config.queuedMaxRequests
   private val totalProcessorThreads = numProcessorThreads * endpoints.size
@@ -87,17 +87,18 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
       val brokerId = config.brokerId
 
       var processorBeginIndex = 0
-      endpoints.values.foreach { endpoint =>
-        val protocol = endpoint.protocolType
+      config.listeners.foreach { endpoint =>
+        val listenerName = endpoint.listenerName
+        val securityProtocol = endpoint.securityProtocol
         val processorEndIndex = processorBeginIndex + numProcessorThreads
 
         for (i <- processorBeginIndex until processorEndIndex)
-          processors(i) = newProcessor(i, connectionQuotas, protocol)
+          processors(i) = newProcessor(i, connectionQuotas, listenerName, 
securityProtocol)
 
         val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, 
brokerId,
           processors.slice(processorBeginIndex, processorEndIndex), 
connectionQuotas)
         acceptors.put(endpoint, acceptor)
-        
Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, 
endpoint.port), acceptor, false).start()
+        
Utils.newThread(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}",
 acceptor, false).start()
         acceptor.awaitStartup()
 
         processorBeginIndex = processorEndIndex
@@ -130,23 +131,25 @@ class SocketServer(val config: KafkaConfig, val metrics: 
Metrics, val time: Time
     info("Shutdown completed")
   }
 
-  def boundPort(protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int 
= {
+  def boundPort(listenerName: ListenerName): Int = {
     try {
-      acceptors(endpoints(protocol)).serverChannel.socket().getLocalPort
+      acceptors(endpoints(listenerName)).serverChannel.socket.getLocalPort
     } catch {
       case e: Exception => throw new KafkaException("Tried to check server's 
port before server was started or checked for port of non-existing protocol", e)
     }
   }
 
   /* `protected` for test usage */
-  protected[network] def newProcessor(id: Int, connectionQuotas: 
ConnectionQuotas, protocol: SecurityProtocol): Processor = {
+  protected[network] def newProcessor(id: Int, connectionQuotas: 
ConnectionQuotas, listenerName: ListenerName,
+                                      securityProtocol: SecurityProtocol): 
Processor = {
     new Processor(id,
       time,
       config.socketRequestMaxBytes,
       requestChannel,
       connectionQuotas,
       config.connectionsMaxIdleMs,
-      protocol,
+      listenerName,
+      securityProtocol,
       config.values,
       metrics,
       credentialProvider
@@ -253,7 +256,8 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
 
   this.synchronized {
     processors.foreach { processor =>
-      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, 
endPoint.protocolType.toString, processor.id), processor, false).start()
+      
Utils.newThread(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+        processor, false).start()
     }
   }
 
@@ -373,7 +377,8 @@ private[kafka] class Processor(val id: Int,
                                requestChannel: RequestChannel,
                                connectionQuotas: ConnectionQuotas,
                                connectionsMaxIdleMs: Long,
-                               protocol: SecurityProtocol,
+                               listenerName: ListenerName,
+                               securityProtocol: SecurityProtocol,
                                channelConfigs: java.util.Map[String, _],
                                metrics: Metrics,
                                credentialProvider: CredentialProvider) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
@@ -414,7 +419,7 @@ private[kafka] class Processor(val id: Int,
     "socket-server",
     metricTags,
     false,
-    ChannelBuilders.serverChannelBuilder(protocol, channelConfigs, 
credentialProvider.credentialCache))
+    ChannelBuilders.serverChannelBuilder(securityProtocol, channelConfigs, 
credentialProvider.credentialCache))
 
   override def run() {
     startupComplete()
@@ -505,7 +510,9 @@ private[kafka] class Processor(val id: Int,
           val channel = if (openChannel != null) openChannel else 
selector.closingChannel(receive.source)
           RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
channel.principal.getName), channel.socketAddress)
         }
-        val req = RequestChannel.Request(processor = id, connectionId = 
receive.source, session = session, buffer = receive.payload, startTimeMs = 
time.milliseconds, securityProtocol = protocol)
+        val req = RequestChannel.Request(processor = id, connectionId = 
receive.source, session = session,
+          buffer = receive.payload, startTimeMs = time.milliseconds, 
listenerName = listenerName,
+          securityProtocol = securityProtocol)
         requestChannel.sendRequest(req)
         selector.mute(receive.source)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 794c23d..5c0201b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -37,6 +37,7 @@ import kafka.security.auth.{Authorizer, ClusterAction, 
Create, Delete, Describe,
 import kafka.utils.{Logging, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
NotLeaderForPartitionException, TopicExistsException, 
UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, 
SecurityProtocol}
 import org.apache.kafka.common.record.{MemoryRecords, Record}
 import org.apache.kafka.common.requests._
@@ -785,13 +786,13 @@ class KafkaApis(val requestChannel: RequestChannel,
       offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs)
   }
 
-  private def getOrCreateGroupMetadataTopic(securityProtocol: 
SecurityProtocol): MetadataResponse.TopicMetadata = {
-    val topicMetadata = 
metadataCache.getTopicMetadata(Set(Topic.GroupMetadataTopicName), 
securityProtocol)
+  private def getOrCreateGroupMetadataTopic(listenerName: ListenerName): 
MetadataResponse.TopicMetadata = {
+    val topicMetadata = 
metadataCache.getTopicMetadata(Set(Topic.GroupMetadataTopicName), listenerName)
     topicMetadata.headOption.getOrElse(createGroupMetadataTopic())
   }
 
-  private def getTopicMetadata(topics: Set[String], securityProtocol: 
SecurityProtocol, errorUnavailableEndpoints: Boolean): 
Seq[MetadataResponse.TopicMetadata] = {
-    val topicResponses = metadataCache.getTopicMetadata(topics, 
securityProtocol, errorUnavailableEndpoints)
+  private def getTopicMetadata(topics: Set[String], listenerName: 
ListenerName, errorUnavailableEndpoints: Boolean): 
Seq[MetadataResponse.TopicMetadata] = {
+    val topicResponses = metadataCache.getTopicMetadata(topics, listenerName, 
errorUnavailableEndpoints)
     if (topics.isEmpty || topicResponses.size == topics.size) {
       topicResponses
     } else {
@@ -866,7 +867,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (authorizedTopics.isEmpty)
         Seq.empty[MetadataResponse.TopicMetadata]
       else
-        getTopicMetadata(authorizedTopics, request.securityProtocol, 
errorUnavailableEndpoints)
+        getTopicMetadata(authorizedTopics, request.listenerName, 
errorUnavailableEndpoints)
 
     val completeTopicMetadata = topicMetadata ++ 
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
 
@@ -876,7 +877,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       brokers.mkString(","), request.header.correlationId, 
request.header.clientId))
 
     val responseBody = new MetadataResponse(
-      brokers.map(_.getNode(request.securityProtocol)).asJava,
+      brokers.map(_.getNode(request.listenerName)).asJava,
       clusterId,
       
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
       completeTopicMetadata.asJava,
@@ -953,7 +954,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)
 
       // get metadata (and create the topic if necessary)
-      val offsetsTopicMetadata = 
getOrCreateGroupMetadataTopic(request.securityProtocol)
+      val offsetsTopicMetadata = 
getOrCreateGroupMetadataTopic(request.listenerName)
 
       val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
         new 
GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, 
Node.noNode)

Reply via email to