http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 73fee6c..7f0befe 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -27,9 +27,9 @@ import kafka.message.{BrokerCompressionCodec, 
CompressionCodec, Message, Message
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.ConfigDef.ValidList
-import org.apache.kafka.common.config.SaslConfigs
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, SaslConfigs, SslConfigs}
 import org.apache.kafka.common.metrics.{MetricsReporter, Sensor}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.record.TimestampType
 
@@ -58,6 +58,11 @@ object Defaults {
   /** ********* Socket Server Configuration ***********/
   val Port = 9092
   val HostName: String = new String("")
+
+  val ListenerSecurityProtocolMap: String = 
EndPoint.DefaultSecurityProtocolMap.map { case (listenerName, securityProtocol) 
=>
+    s"${listenerName.value}:${securityProtocol.name}"
+  }.mkString(",")
+
   val SocketSendBufferBytes: Int = 100 * 1024
   val SocketReceiveBufferBytes: Int = 100 * 1024
   val SocketRequestMaxBytes: Int = 100 * 1024 * 1024
@@ -223,6 +228,7 @@ object KafkaConfig {
   val AdvertisedHostNameProp: String = "advertised.host.name"
   val AdvertisedPortProp = "advertised.port"
   val AdvertisedListenersProp = "advertised.listeners"
+  val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
   val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
   val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
   val SocketRequestMaxBytesProp = "socket.request.max.bytes"
@@ -295,6 +301,7 @@ object KafkaConfig {
   val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
   val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
   val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
+  val InterBrokerListenerNameProp = "inter.broker.listener.name"
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
   val ControlledShutdownRetryBackoffMsProp = 
"controlled.shutdown.retry.backoff.ms"
@@ -387,12 +394,13 @@ object KafkaConfig {
   val HostNameDoc = "DEPRECATED: only used when `listeners` is not set. " +
   "Use `listeners` instead. \n" +
   "hostname of broker. If this is set, it will only bind to this address. If 
this is not set, it will bind to all interfaces"
-  val ListenersDoc = "Listener List - Comma-separated list of URIs we will 
listen on and their protocols.\n" +
+  val ListenersDoc = "Listener List - Comma-separated list of URIs we will 
listen on and the listener names." +
+  s" If the listener name is not a security protocol, 
$ListenerSecurityProtocolMapProp must also be set.\n" +
   " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" +
   " Leave hostname empty to bind to default interface.\n" +
   " Examples of legal listener lists:\n" +
-  " PLAINTEXT://myhost:9092,TRACE://:9091\n" +
-  " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n"
+  " PLAINTEXT://myhost:9092,SSL://:9091\n" +
+  " CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n"
   val AdvertisedHostNameDoc = "DEPRECATED: only used when 
`advertised.listeners` or `listeners` are not set. " +
   "Use `advertised.listeners` instead. \n" +
   "Hostname to publish to ZooKeeper for clients to use. In IaaS environments, 
this may " +
@@ -407,6 +415,12 @@ object KafkaConfig {
   val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients 
to use, if different than the listeners above." +
   " In IaaS environments, this may need to be different from the interface to 
which the broker binds." +
   " If this is not set, the value for `listeners` will be used."
+  val ListenerSecurityProtocolMapDoc = "Map between listener names and 
security protocols. This must be defined for " +
+    "the same security protocol to be usable in more than one port or IP. For 
example, we can separate internal and " +
+    "external traffic even if SSL is required for both. Concretely, we could 
define listeners with names INTERNAL " +
+    "and EXTERNAL and this property as: `INTERNAL:SSL,EXTERNAL:SSL`. As shown, 
key and value are separated by a colon " +
+    "and map entries are separated by commas. Each listener name should only 
appear once in the map."
+
   val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever 
sockets. If the value is -1, the OS default will be used."
   val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever 
sockets. If the value is -1, the OS default will be used."
   val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket 
request"
@@ -504,10 +518,13 @@ object KafkaConfig {
   val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the 
partition rebalance check is triggered by the controller"
   val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas 
not in the ISR set to be elected as leader as a last resort, even though doing 
so may result in data loss"
   val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate 
between brokers. Valid values are: " +
-    
s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", 
")}."
+    
s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", 
")}. It is an error to set this and " +
+    s"$InterBrokerListenerNameProp properties at the same time."
   val InterBrokerProtocolVersionDoc = "Specify which version of the 
inter-broker protocol will be used.\n" +
   " This is typically bumped after all brokers were upgraded to a new 
version.\n" +
   " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 
0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list."
+  val InterBrokerListenerNameDoc = s"Name of listener used for communication 
between brokers. If this is unset, the listener name is defined by 
$InterBrokerSecurityProtocolProp. " +
+    s"It is an error to set this and $InterBrokerSecurityProtocolProp 
properties at the same time."
   /** ********* Controlled shutdown configuration ***********/
   val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for 
multiple reasons. This determines the number of retries when such failure 
happens"
   val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system 
needs time to recover from the state that caused the previous failure 
(Controller fail over, replica lag etc). This config determines the amount of 
time to wait before retrying."
@@ -617,6 +634,7 @@ object KafkaConfig {
       .define(AdvertisedHostNameProp, STRING, null, HIGH, 
AdvertisedHostNameDoc)
       .define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc)
       .define(AdvertisedListenersProp, STRING, null, HIGH, 
AdvertisedListenersDoc)
+      .define(ListenerSecurityProtocolMapProp, STRING, 
Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc)
       .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, 
HIGH, SocketSendBufferBytesDoc)
       .define(SocketReceiveBufferBytesProp, INT, 
Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
       .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, 
atLeast(1), HIGH, SocketRequestMaxBytesDoc)
@@ -692,6 +710,7 @@ object KafkaConfig {
       .define(UncleanLeaderElectionEnableProp, BOOLEAN, 
Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc)
       .define(InterBrokerSecurityProtocolProp, STRING, 
Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc)
       .define(InterBrokerProtocolVersionProp, STRING, 
Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc)
+      .define(InterBrokerListenerNameProp, STRING, null, MEDIUM, 
InterBrokerListenerNameDoc)
 
       /** ********* Controlled shutdown configuration ***********/
       .define(ControlledShutdownMaxRetriesProp, INT, 
Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc)
@@ -885,7 +904,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
   val leaderImbalancePerBrokerPercentage = 
getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp)
   val leaderImbalanceCheckIntervalSeconds = 
getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp)
   val uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp)
-  val interBrokerSecurityProtocol = 
SecurityProtocol.forName(getString(KafkaConfig.InterBrokerSecurityProtocolProp))
+
+  val (interBrokerListenerName, interBrokerSecurityProtocol) = 
getInterBrokerListenerNameAndSecurityProtocol
+
   // We keep the user-provided String as `ApiVersion.apply` can choose a 
slightly different version (eg if `0.10.0`
   // is passed, `0.10.0-IV0` may be picked)
   val interBrokerProtocolVersionString = 
getString(KafkaConfig.InterBrokerProtocolVersionProp)
@@ -953,9 +974,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
 
   val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp)
   val compressionType = getString(KafkaConfig.CompressionTypeProp)
-
-  val listeners = getListeners
-  val advertisedListeners = getAdvertisedListeners
+  val listeners: Seq[EndPoint] = getListeners
+  val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners
+  private[kafka] lazy val listenerSecurityProtocolMap = 
getListenerSecurityProtocolMap
 
   private def getLogRetentionTimeMillis: Long = {
     val millisInMinute = 60L * 1000L
@@ -980,45 +1001,57 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean) extends Abstra
     }
   }
 
-  private def validateUniquePortAndProtocol(listeners: String) {
-
-    val endpoints = try {
-      val listenerList = CoreUtils.parseCsvList(listeners)
-      listenerList.map(listener => EndPoint.createEndPoint(listener))
-    } catch {
-      case e: Exception => throw new IllegalArgumentException("Error creating 
broker listeners from '%s': %s".format(listeners, e.getMessage))
-    }
-    // filter port 0 for unit tests
-    val endpointsWithoutZeroPort = endpoints.map(ep => ep.port).filter(_ != 0)
-    val distinctPorts = endpointsWithoutZeroPort.distinct
-    val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct
-
-    require(distinctPorts.size == endpointsWithoutZeroPort.size, "Each 
listener must have a different port")
-    require(distinctProtocols.size == endpoints.size, "Each listener must have 
a different protocol")
-  }
-
   // If the user did not define listeners but did define host or port, let's 
use them in backward compatible way
   // If none of those are defined, we default to PLAINTEXT://:9092
-  private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = {
-    if (getString(KafkaConfig.ListenersProp) != null) {
-      validateUniquePortAndProtocol(getString(KafkaConfig.ListenersProp))
-      CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp))
-    } else {
-      CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port)
-    }
+  private def getListeners: Seq[EndPoint] = {
+    Option(getString(KafkaConfig.ListenersProp)).map { listenerProp =>
+      CoreUtils.listenerListToEndPoints(listenerProp, 
listenerSecurityProtocolMap)
+    }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + 
":" + port, listenerSecurityProtocolMap))
   }
 
   // If the user defined advertised listeners, we use those
   // If he didn't but did define advertised host or port, we'll use those and 
fill in the missing value from regular host / port or defaults
   // If none of these are defined, we'll use the listeners
-  private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, 
EndPoint] = {
-    if (getString(KafkaConfig.AdvertisedListenersProp) != null) {
-      
validateUniquePortAndProtocol(getString(KafkaConfig.AdvertisedListenersProp))
-      
CoreUtils.listenerListToEndPoints(getString(KafkaConfig.AdvertisedListenersProp))
-    } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null) {
-      CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + 
":" + advertisedPort)
-    } else {
-      getListeners()
+  private def getAdvertisedListeners: Seq[EndPoint] = {
+    val advertisedListenersProp = 
getString(KafkaConfig.AdvertisedListenersProp)
+    if (advertisedListenersProp != null)
+      CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
listenerSecurityProtocolMap)
+    else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || 
getInt(KafkaConfig.AdvertisedPortProp) != null)
+      CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + 
":" + advertisedPort, listenerSecurityProtocolMap)
+    else
+      getListeners
+  }
+
+  private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
SecurityProtocol) = {
+    Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match {
+      case Some(_) if 
originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) =>
+        throw new ConfigException(s"Only one of 
${KafkaConfig.InterBrokerListenerNameProp} and " +
+          s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.")
+      case Some(name) =>
+        val listenerName = ListenerName.normalised(name)
+        val securityProtocol = 
listenerSecurityProtocolMap.getOrElse(listenerName,
+          throw new ConfigException(s"Listener with name ${listenerName.value} 
defined in " +
+            s"${KafkaConfig.InterBrokerListenerNameProp} not found in 
${KafkaConfig.ListenerSecurityProtocolMapProp}."))
+        (listenerName, securityProtocol)
+      case None =>
+        val securityProtocol = 
getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp),
+          KafkaConfig.InterBrokerSecurityProtocolProp)
+        (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
+    }
+  }
+
+  private def getSecurityProtocol(protocolName: String, configName: String): 
SecurityProtocol = {
+    try SecurityProtocol.forName(protocolName)
+    catch {
+      case e: IllegalArgumentException =>
+        throw new ConfigException(s"Invalid security protocol `$protocolName` 
defined in $configName")
+    }
+  }
+
+  private def getListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {
+    getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+      .map { case (listenerName, protocolName) =>
+      ListenerName.normalised(listenerName) -> 
getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp)
     }
   }
 
@@ -1043,12 +1076,16 @@ class KafkaConfig(val props: java.util.Map[_, _], 
doLog: Boolean) extends Abstra
       "offsets.commit.required.acks must be greater or equal -1 and less or 
equal to offsets.topic.replication.factor")
     require(BrokerCompressionCodec.isValid(compressionType), "compression.type 
: " + compressionType + " is not valid." +
       " Valid options are " + 
BrokerCompressionCodec.brokerCompressionOptions.mkString(","))
-    require(advertisedListeners.keySet.contains(interBrokerSecurityProtocol),
-      s"${KafkaConfig.InterBrokerSecurityProtocolProp} must be a protocol in 
the configured set of ${KafkaConfig.AdvertisedListenersProp}. " +
-      s"The valid options based on currently configured protocols are 
${advertisedListeners.keySet}")
-    require(advertisedListeners.keySet.subsetOf(listeners.keySet),
-      s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a 
subset of ${KafkaConfig.ListenersProp} protocols. " +
-      s"Found ${advertisedListeners.keySet}. The valid options based on 
currently configured protocols are ${listeners.keySet}"
+
+    val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet
+    val listenerNames = listeners.map(_.listenerName).toSet
+    require(advertisedListenerNames.contains(interBrokerListenerName),
+      s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name 
defined in ${KafkaConfig.AdvertisedListenersProp}. " +
+      s"The valid options based on currently configured listeners are 
${advertisedListenerNames.map(_.value).mkString(",")}")
+    require(advertisedListenerNames.subsetOf(listenerNames),
+      s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to 
or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " +
+      s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid 
options based on the current configuration " +
+      s"are ${listenerNames.map(_.value).mkString(",")}"
     )
     require(interBrokerProtocolVersion >= logMessageFormatVersion,
       s"log.message.format.version $logMessageFormatVersionString cannot be 
used when inter.broker.protocol.version is set to 
$interBrokerProtocolVersionString")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala 
b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 4133145..c7b398f 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -26,6 +26,7 @@ import kafka.cluster.EndPoint
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.I0Itec.zkclient.IZkStateListener
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.zookeeper.Watcher.Event.KeeperState
 
@@ -38,7 +39,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
  * we are dead.
  */
 class KafkaHealthcheck(brokerId: Int,
-                       advertisedEndpoints: Map[SecurityProtocol, EndPoint],
+                       advertisedEndpoints: Seq[EndPoint],
                        zkUtils: ZkUtils,
                        rack: Option[String],
                        interBrokerProtocolVersion: ApiVersion) extends Logging 
{
@@ -55,17 +56,18 @@ class KafkaHealthcheck(brokerId: Int,
    */
   def register() {
     val jmxPort = System.getProperty("com.sun.management.jmxremote.port", 
"-1").toInt
-    val updatedEndpoints = advertisedEndpoints.mapValues(endpoint =>
+    val updatedEndpoints = advertisedEndpoints.map(endpoint =>
       if (endpoint.host == null || endpoint.host.trim.isEmpty)
-        EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, 
endpoint.protocolType)
+        endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName)
       else
         endpoint
     )
 
-    // the default host and port are here for compatibility with older client
-    // only PLAINTEXT is supported as default
-    // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint 
will be registered and older clients will be unable to connect
-    val plaintextEndpoint = 
updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new 
EndPoint(null,-1,null))
+    // the default host and port are here for compatibility with older clients 
that only support PLAINTEXT
+    // we choose the first plaintext port, if there is one
+    // or we register an empty endpoint, which means that older clients will 
not be able to connect
+    val plaintextEndpoint = updatedEndpoints.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).getOrElse(
+      new EndPoint(null, -1, null, null))
     zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, 
plaintextEndpoint.port, updatedEndpoints, jmxPort, rack,
       interBrokerProtocolVersion)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7cf3940..79548e8 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -253,11 +253,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
         dynamicConfigManager.startup()
 
         /* tell everyone we are alive */
-        val listeners = config.advertisedListeners.map {case(protocol, 
endpoint) =>
+        val listeners = config.advertisedListeners.map { endpoint =>
           if (endpoint.port == 0)
-            (protocol, EndPoint(endpoint.host, 
socketServer.boundPort(protocol), endpoint.protocolType))
+            endpoint.copy(port = socketServer.boundPort(endpoint.listenerName))
           else
-            (protocol, endpoint)
+            endpoint
         }
         kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, 
zkUtils, config.rack,
           config.interBrokerProtocolVersion)
@@ -345,7 +345,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
   private def controlledShutdown() {
 
     def node(broker: Broker): Node = {
-      val brokerEndPoint = 
broker.getBrokerEndPoint(config.interBrokerSecurityProtocol)
+      val brokerEndPoint = 
broker.getBrokerEndPoint(config.interBrokerListenerName)
       new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
     }
 
@@ -482,8 +482,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
                 if (channel != null)
                   channel.disconnect()
 
-                channel = new 
BlockingChannel(broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).host,
-                  
broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).port,
+                val brokerEndPoint = 
broker.getBrokerEndPoint(config.interBrokerListenerName)
+                channel = new BlockingChannel(brokerEndPoint.host,
+                  brokerEndPoint.port,
                   BlockingChannel.UseDefaultBufferSize,
                   BlockingChannel.UseDefaultBufferSize,
                   config.controllerSocketTimeoutMs)
@@ -619,7 +620,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   def getLogManager(): LogManager = logManager
 
-  def boundPort(protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int 
= socketServer.boundPort(protocol)
+  def boundPort(listenerName: ListenerName): Int = 
socketServer.boundPort(listenerName)
 
   private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): 
LogManager = {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/MetadataCache.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index feef6ae..5c28e14 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.util.EnumMap
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.{Seq, Set, mutable}
@@ -29,7 +28,8 @@ import kafka.controller.{KafkaController, 
LeaderIsrAndControllerEpoch}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import org.apache.kafka.common.Node
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, 
UpdateMetadataRequest}
 
 /**
@@ -41,7 +41,7 @@ private[server] class MetadataCache(brokerId: Int) extends 
Logging {
   private val cache = mutable.Map[String, mutable.Map[Int, 
PartitionStateInfo]]()
   private var controllerId: Option[Int] = None
   private val aliveBrokers = mutable.Map[Int, Broker]()
-  private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, 
Node]]()
+  private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, 
Node]]()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
 
   this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] "
@@ -49,10 +49,10 @@ private[server] class MetadataCache(brokerId: Int) extends 
Logging {
   // This method is the main hotspot when it comes to the performance of 
metadata requests,
   // we should be careful about adding additional logic here.
   // filterUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol, 
filterUnavailableEndpoints: Boolean): Seq[Node] = {
+  private def getEndpoints(brokers: Iterable[Int], listenerName: ListenerName, 
filterUnavailableEndpoints: Boolean): Seq[Node] = {
     val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, 
brokers.size))
     brokers.foreach { brokerId =>
-      val endpoint = getAliveEndpoint(brokerId, protocol) match {
+      val endpoint = getAliveEndpoint(brokerId, listenerName) match {
         case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, 
"", -1)) else None
         case Some(node) => Some(node)
       }
@@ -61,23 +61,23 @@ private[server] class MetadataCache(brokerId: Int) extends 
Logging {
     result
   }
 
-  private def getAliveEndpoint(brokerId: Int, protocol: SecurityProtocol): 
Option[Node] =
+  private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): 
Option[Node] =
     aliveNodes.get(brokerId).map { nodeMap =>
-      nodeMap.getOrElse(protocol,
-        throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` 
does not support security protocol `$protocol`"))
+      nodeMap.getOrElse(listenerName,
+        throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` 
does not have listener with name `$listenerName`"))
     }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  private def getPartitionMetadata(topic: String, protocol: SecurityProtocol, 
errorUnavailableEndpoints: Boolean): 
Option[Iterable[MetadataResponse.PartitionMetadata]] = {
+  private def getPartitionMetadata(topic: String, listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean): 
Option[Iterable[MetadataResponse.PartitionMetadata]] = {
     cache.get(topic).map { partitions =>
       partitions.map { case (partitionId, partitionState) =>
         val topicPartition = TopicAndPartition(topic, partitionId)
 
         val leaderAndIsr = 
partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
-        val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, protocol)
+        val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, listenerName)
 
         val replicas = partitionState.allReplicas
-        val replicaInfo = getEndpoints(replicas, protocol, 
errorUnavailableEndpoints)
+        val replicaInfo = getEndpoints(replicas, listenerName, 
errorUnavailableEndpoints)
 
         maybeLeader match {
           case None =>
@@ -87,7 +87,7 @@ private[server] class MetadataCache(brokerId: Int) extends 
Logging {
 
           case Some(leader) =>
             val isr = leaderAndIsr.isr
-            val isrInfo = getEndpoints(isr, protocol, 
errorUnavailableEndpoints)
+            val isrInfo = getEndpoints(isr, listenerName, 
errorUnavailableEndpoints)
 
             if (replicaInfo.size < replicas.size) {
               debug(s"Error while fetching metadata for $topicPartition: 
replica information not available for " +
@@ -110,10 +110,10 @@ private[server] class MetadataCache(brokerId: Int) 
extends Logging {
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
-  def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol, 
errorUnavailableEndpoints: Boolean = false): 
Seq[MetadataResponse.TopicMetadata] = {
+  def getTopicMetadata(topics: Set[String], listenerName: ListenerName, 
errorUnavailableEndpoints: Boolean = false): 
Seq[MetadataResponse.TopicMetadata] = {
     inReadLock(partitionMetadataLock) {
       topics.toSeq.flatMap { topic =>
-        getPartitionMetadata(topic, protocol, errorUnavailableEndpoints).map { 
partitionMetadata =>
+        getPartitionMetadata(topic, listenerName, 
errorUnavailableEndpoints).map { partitionMetadata =>
           new MetadataResponse.TopicMetadata(Errors.NONE, topic, 
Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
         }
       }
@@ -164,13 +164,16 @@ private[server] class MetadataCache(brokerId: Int) 
extends Logging {
       aliveNodes.clear()
       aliveBrokers.clear()
       updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
-        val nodes = new EnumMap[SecurityProtocol, 
Node](classOf[SecurityProtocol])
-        val endPoints = new EnumMap[SecurityProtocol, 
EndPoint](classOf[SecurityProtocol])
-        broker.endPoints.asScala.foreach { case (protocol, ep) =>
-          endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol))
-          nodes.put(protocol, new Node(broker.id, ep.host, ep.port))
+        // `aliveNodes` is a hot path for metadata requests for large 
clusters, so we use java.util.HashMap which
+        // is a bit faster than scala.collection.mutable.HashMap. When we drop 
support for Scala 2.10, we could
+        // move to `AnyRefMap`, which has comparable performance.
+        val nodes = new java.util.HashMap[ListenerName, Node]
+        val endPoints = new mutable.ArrayBuffer[EndPoint]
+        broker.endPoints.asScala.foreach { ep =>
+          endPoints += EndPoint(ep.host, ep.port, ep.listenerName, 
ep.securityProtocol)
+          nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port))
         }
-        aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala, 
Option(broker.rack))
+        aliveBrokers(broker.id) = Broker(broker.id, endPoints, 
Option(broker.rack))
         aliveNodes(broker.id) = nodes.asScala
       }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 87b8d90..199bb67 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -877,7 +877,7 @@ class ReplicaManager(val config: KafkaConfig,
         // we do not need to check if the leader exists again since this has 
been done at the beginning of this process
         val partitionsToMakeFollowerWithLeaderAndOffset = 
partitionsToMakeFollower.map(partition =>
           partition.topicPartition -> BrokerAndInitialOffset(
-            metadataCache.getAliveBrokers.find(_.id == 
partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol),
+            metadataCache.getAliveBrokers.find(_.id == 
partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
             partition.getReplica().get.logEndOffset.messageOffset)).toMap
         
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 8f86f66..fa8febc 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -32,6 +32,7 @@ import kafka.client.ClientUtils
 import kafka.network.BlockingChannel
 import kafka.api.PartitionOffsetRequestInfo
 import org.I0Itec.zkclient.exception.ZkNoNodeException
+import org.apache.kafka.common.network.ListenerName
 
 object ConsumerOffsetChecker extends Logging {
 
@@ -42,7 +43,7 @@ object ConsumerOffsetChecker extends Logging {
   private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] 
= {
     try {
       zkUtils.getBrokerInfo(bid)
-        .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
+        
.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
         .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 
10000, 100000, "ConsumerOffsetChecker"))
         .orElse(throw new BrokerNotAvailableException("Broker id %d does not 
exist".format(bid)))
     } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 
b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 96a33b1..8c6a8ba 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -18,10 +18,11 @@
 package kafka.tools
 
 import org.I0Itec.zkclient.ZkClient
-import kafka.consumer.{SimpleConsumer, ConsumerConfig}
-import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
-import kafka.common.{TopicAndPartition, KafkaException}
-import kafka.utils.{ZKGroupTopicDirs, ZkUtils, CoreUtils}
+import kafka.consumer.{ConsumerConfig, SimpleConsumer}
+import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
+import kafka.common.{KafkaException, TopicAndPartition}
+import kafka.utils.{CoreUtils, ZKGroupTopicDirs, ZkUtils}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
@@ -67,9 +68,8 @@ object UpdateOffsetsInZK {
 
       zkUtils.getBrokerInfo(broker) match {
         case Some(brokerInfo) =>
-          val consumer = new 
SimpleConsumer(brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host,
-                                            
brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port,
-                                            10000, 100 * 1024, 
"UpdateOffsetsInZk")
+          val brokerEndPoint = 
brokerInfo.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+          val consumer = new SimpleConsumer(brokerEndPoint.host, 
brokerEndPoint.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
           val topicAndPartition = TopicAndPartition(topic, partition)
           val request = OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(offsetOption, 1)))
           val offset = 
consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/utils/CoreUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 7fe9cc9..d427e9c 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol
 import scala.collection._
 import scala.collection.mutable
 import kafka.cluster.EndPoint
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.utils.Utils
 
 /**
@@ -254,9 +255,26 @@ object CoreUtils extends Logging {
       .keys
   }
 
-  def listenerListToEndPoints(listeners: String): 
immutable.Map[SecurityProtocol, EndPoint] = {
-    val listenerList = parseCsvList(listeners)
-    listenerList.map(listener => EndPoint.createEndPoint(listener)).map(ep => 
ep.protocolType -> ep).toMap
+  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
+    def validate(endPoints: Seq[EndPoint]): Unit = {
+      // filter port 0 for unit tests
+      val portsExcludingZero = endPoints.map(_.port).filter(_ != 0)
+      val distinctPorts = portsExcludingZero.distinct
+      val distinctListenerNames = endPoints.map(_.listenerName).distinct
+
+      require(distinctPorts.size == portsExcludingZero.size, s"Each listener 
must have a different port, listeners: $listeners")
+      require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
+    }
+
+    val endPoints = try {
+      val listenerList = parseCsvList(listeners)
+      listenerList.map(EndPoint.createEndPoint(_, Some(securityProtocolMap)))
+    } catch {
+      case e: Exception =>
+        throw new IllegalArgumentException(s"Error creating broker listeners 
from '$listeners': ${e.getMessage}", e)
+    }
+    validate(endPoints)
+    endPoints
   }
 
   def generateUuidAsBase64(): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index fcb5648..7a6bd63 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.exception.{ZkBadVersionException, 
ZkException, ZkMars
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import org.I0Itec.zkclient.{ZkClient, ZkConnection}
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.utils.Time
 import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback}
@@ -252,10 +253,6 @@ class ZkUtils(val zkClient: ZkClient,
     brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get)
   }
 
-  def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): 
Seq[BrokerEndPoint] = {
-    getAllBrokersInCluster().map(_.getBrokerEndPoint(protocolType))
-  }
-
   def getLeaderAndIsrForPartition(topic: String, partition: 
Int):Option[LeaderAndIsr] = {
     ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topic, 
partition).map(_.leaderAndIsr)
   }
@@ -266,15 +263,8 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   def getLeaderForPartition(topic: String, partition: Int): Option[Int] = {
-    val leaderAndIsrOpt = 
readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1
-    leaderAndIsrOpt match {
-      case Some(leaderAndIsr) =>
-        Json.parseFull(leaderAndIsr) match {
-          case Some(m) =>
-            Some(m.asInstanceOf[Map[String, 
Any]].get("leader").get.asInstanceOf[Int])
-          case None => None
-        }
-      case None => None
+    readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, 
partition))._1.flatMap { leaderAndIsr =>
+      Json.parseFull(leaderAndIsr).map(_.asInstanceOf[Map[String, 
Any]]("leader").asInstanceOf[Int])
     }
   }
 
@@ -341,11 +331,11 @@ class ZkUtils(val zkClient: ZkClient,
   }
 
   /**
-   * Register brokers with v3 json format (which includes multiple endpoints 
and rack) if
+   * Register brokers with v4 json format (which includes multiple endpoints 
and rack) if
    * the apiVersion is 0.10.0.X or above. Register the broker with v2 json 
format otherwise.
    * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON 
version is above 2.
-   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 
to 0.10.0.X without having to upgrade
-   * to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case).
+   * We include v2 to make it possible for the broker to migrate from 0.9.0.0 
to 0.10.0.X or above without having to
+   * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any 
case).
    *
    * This format also includes default endpoints for compatibility with older 
clients.
    *
@@ -360,25 +350,15 @@ class ZkUtils(val zkClient: ZkClient,
   def registerBrokerInZk(id: Int,
                          host: String,
                          port: Int,
-                         advertisedEndpoints: collection.Map[SecurityProtocol, 
EndPoint],
+                         advertisedEndpoints: Seq[EndPoint],
                          jmxPort: Int,
                          rack: Option[String],
                          apiVersion: ApiVersion) {
     val brokerIdPath = BrokerIdsPath + "/" + id
-    val timestamp = Time.SYSTEM.milliseconds.toString
-
-    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 3 else 2
-    var jsonMap = Map("version" -> version,
-                      "host" -> host,
-                      "port" -> port,
-                      "endpoints" -> 
advertisedEndpoints.values.map(_.connectionString).toArray,
-                      "jmx_port" -> jmxPort,
-                      "timestamp" -> timestamp
-    )
-    rack.foreach(rack => if (version >= 3) jsonMap += ("rack" -> rack))
-
-    val brokerInfo = Json.encode(jsonMap)
-    registerBrokerInZk(brokerIdPath, brokerInfo)
+    // see method documentation for reason why we do this
+    val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2
+    val json = Broker.toJson(version, id, host, port, advertisedEndpoints, 
jmxPort, rack)
+    registerBrokerInZk(brokerIdPath, json)
 
     info("Registered broker %d at path %s with addresses: %s".format(id, 
brokerIdPath, advertisedEndpoints.mkString(",")))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a2fc2d5..9e1efa6 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -40,6 +40,7 @@ import scala.collection.mutable
 import scala.collection.mutable.Buffer
 import org.apache.kafka.common.KafkaException
 import kafka.admin.AdminUtils
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.record.MemoryRecords
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
@@ -208,15 +209,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createUpdateMetadataRequest = {
     val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, 
Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava
+    val securityProtocol = SecurityProtocol.PLAINTEXT
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
-      Map(SecurityProtocol.PLAINTEXT -> new 
requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava, null)).asJava
+      Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, 
securityProtocol,
+        ListenerName.forSecurityProtocol(securityProtocol))).asJava, 
null)).asJava
     new requests.UpdateMetadataRequest.Builder(brokerId, Int.MaxValue, 
partitionState, brokers).build()
   }
 
   private def createJoinGroupRequest = {
     new JoinGroupRequest.Builder(group, 10000, "", "consumer",
       List( new 
JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava)
-      .setRebalanceTimeout(60000).build();
+      .setRebalanceTimeout(60000).build()
   }
 
   private def createSyncGroupRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
index 8d676d1..852377c 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala
@@ -113,7 +113,7 @@ class ProducerBounceTest extends KafkaServerTestHarness {
     val newLeaders = (0 until numPartitions).map(i => 
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i))
     val fetchResponses = newLeaders.zipWithIndex.map { case (leader, 
partition) =>
       // Consumers must be instantiated after all the restarts since they use 
random ports each time they start up
-      val consumer = new SimpleConsumer("localhost", 
servers(leader).boundPort(), 100, 1024 * 1024, "")
+      val consumer = new SimpleConsumer("localhost", 
boundPort(servers(leader)), 100, 1024 * 1024, "")
       val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, 
partition, 0, Int.MaxValue).build()).messageSet(topic1, partition)
       consumer.close
       response

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index fc1ceec..874637b 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -75,7 +75,7 @@ class ProducerCompressionTest(compression: String) extends 
ZooKeeperTestHarness
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer")
     var producer = new KafkaProducer[Array[Byte],Array[Byte]](props)
-    val consumer = new SimpleConsumer("localhost", server.boundPort(), 100, 
1024*1024, "")
+    val consumer = new SimpleConsumer("localhost", 
TestUtils.boundPort(server), 100, 1024*1024, "")
 
     try {
       // create topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
new file mode 100644
index 0000000..5bd6414
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
@@ -0,0 +1,124 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.io.File
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+
+import kafka.common.Topic
+import kafka.coordinator.OffsetConfig
+import kafka.utils.{CoreUtils, TestUtils}
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Assert.assertEquals
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+class MultipleListenersWithSameSecurityProtocolTest extends 
ZooKeeperTestHarness {
+
+  private val trustStoreFile = File.createTempFile("truststore", ".jks")
+  private val servers = new ArrayBuffer[KafkaServer]
+  private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], 
Array[Byte]]]()
+  private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], 
Array[Byte]]]()
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    // 2 brokers so that we can test that the data propagates correctly via 
UpdateMetadadaRequest
+    val numServers = 2
+
+    (0 until numServers).foreach { brokerId =>
+
+      val props = TestUtils.createBrokerConfig(brokerId, zkConnect, 
trustStoreFile = Some(trustStoreFile))
+      // Ensure that we can support multiple listeners per security protocol 
and multiple security protocols
+      props.put(KafkaConfig.ListenersProp, "SECURE_INTERNAL://localhost:0, 
INTERNAL://localhost:0, " +
+        "SECURE_EXTERNAL://localhost:0, EXTERNAL://localhost:0")
+      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"INTERNAL:PLAINTEXT, SECURE_INTERNAL:SSL," +
+        "EXTERNAL:PLAINTEXT, SECURE_EXTERNAL:SSL")
+      props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
+      props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, 
Some(trustStoreFile), s"server$brokerId"))
+
+      servers += TestUtils.createServer(KafkaConfig.fromProps(props))
+    }
+
+    val serverConfig = servers.head.config
+    assertEquals(4, serverConfig.listeners.size)
+
+    TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, 
OffsetConfig.DefaultOffsetsTopicNumPartitions,
+      replicationFactor = 2, servers, 
servers.head.groupCoordinator.offsetsTopicConfigs)
+
+    serverConfig.listeners.foreach { endPoint =>
+      val listenerName = endPoint.listenerName
+
+      TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers)
+
+      val trustStoreFile =
+        if (endPoint.securityProtocol == SecurityProtocol.SSL) 
Some(this.trustStoreFile)
+        else None
+
+      val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
+
+      producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, 
acks = -1,
+        securityProtocol = endPoint.securityProtocol, trustStoreFile = 
trustStoreFile)
+
+      consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, 
groupId = listenerName.value,
+        securityProtocol = endPoint.securityProtocol, trustStoreFile = 
trustStoreFile)
+    }
+  }
+
+  @After
+  override def tearDown() {
+    producers.values.foreach(_.close())
+    consumers.values.foreach(_.close())
+    servers.foreach { s =>
+      s.shutdown()
+      CoreUtils.delete(s.config.logDirs)
+    }
+    super.tearDown()
+  }
+
+  /**
+    * Tests that we can produce and consume to/from all broker-defined 
listeners and security protocols. We produce
+    * with acks=-1 to ensure that replication is also working.
+    */
+  @Test
+  def testProduceConsume(): Unit = {
+    producers.foreach { case (listenerName, producer) =>
+      val producerRecords = (1 to 10).map(i => new 
ProducerRecord(listenerName.value, s"key$i".getBytes,
+        s"value$i".getBytes))
+      producerRecords.map(producer.send(_)).map(_.get(10, TimeUnit.SECONDS))
+
+      val consumer = consumers(listenerName)
+      consumer.subscribe(Collections.singleton(listenerName.value))
+      val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]
+      TestUtils.waitUntilTrue(() => {
+        records ++= consumer.poll(50).asScala
+        records.size == producerRecords.size
+      }, s"Consumed ${records.size} records until timeout instead of the 
expected ${producerRecords.size} records")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 0f846e1..d95d90d 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -22,11 +22,12 @@ import org.junit.Assert._
 import org.apache.kafka.common.protocol.SecurityProtocol
 import kafka.zk.ZooKeeperTestHarness
 import kafka.utils.TestUtils._
-import kafka.utils.{ZkUtils, CoreUtils, TestUtils}
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.cluster.Broker
 import kafka.client.ClientUtils
 import kafka.server.{KafkaConfig, KafkaServer}
-import org.junit.{Test, After, Before}
+import org.apache.kafka.common.network.ListenerName
+import org.junit.{After, Before, Test}
 
 class AddPartitionsTest extends ZooKeeperTestHarness {
   var configs: Seq[KafkaConfig] = null
@@ -47,7 +48,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     configs = (0 until 4).map(i => 
KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, 
enableControlledShutdown = false)))
     // start all the servers
     servers = configs.map(c => TestUtils.createServer(c))
-    brokers = servers.map(s => new Broker(s.config.brokerId, 
s.config.hostName, s.boundPort()))
+    brokers = servers.map(s => TestUtils.createBroker(s.config.brokerId, 
s.config.hostName, TestUtils.boundPort(s)))
 
     // create topics first
     createTopic(zkUtils, topic1, partitionReplicaAssignment = 
Map(0->Seq(0,1)), servers = servers)
@@ -97,8 +98,9 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), 
brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), 
"AddPartitionsTest-testIncrementPartitions",
-      2000,0).topicsMetadata
+    val listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), 
brokers.map(_.getBrokerEndPoint(listenerName)),
+      "AddPartitionsTest-testIncrementPartitions", 2000, 0).topicsMetadata
     val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
     val partitionDataForTopic1 = 
metaDataForTopic1.head.partitionsMetadata.sortBy(_.partitionId)
     assertEquals(partitionDataForTopic1.size, 3)
@@ -123,8 +125,9 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), 
brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), 
"AddPartitionsTest-testManualAssignmentOfReplicas",
-      2000,0).topicsMetadata
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
+      
brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+      "AddPartitionsTest-testManualAssignmentOfReplicas", 2000, 
0).topicsMetadata
     val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2))
     val partitionDataForTopic2 = 
metaDataForTopic2.head.partitionsMetadata.sortBy(_.partitionId)
     assertEquals(partitionDataForTopic2.size, 3)
@@ -148,8 +151,9 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
 
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), 
brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), 
"AddPartitionsTest-testReplicaPlacementAllServers",
-      2000,0).topicsMetadata
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic3),
+      
brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+      "AddPartitionsTest-testReplicaPlacementAllServers", 2000, 
0).topicsMetadata
 
     val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get
 
@@ -170,8 +174,9 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
 
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), 
brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), 
"AddPartitionsTest-testReplicaPlacementPartialServers",
-      2000,0).topicsMetadata
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
+      
brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+      "AddPartitionsTest-testReplicaPlacementPartialServers", 2000, 
0).topicsMetadata
 
     val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index f2a2362..924daf8 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -18,15 +18,13 @@ package kafka.admin
 
 import java.util.Properties
 
-import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
 import kafka.log.LogConfig
 import kafka.log.LogConfig._
 import kafka.server.{ConfigType, DynamicConfig}
 import kafka.utils.CoreUtils._
 import kafka.utils.TestUtils._
-import kafka.utils.{CoreUtils, Logging, ZkUtils}
-import org.apache.kafka.common.protocol.SecurityProtocol
+import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils}
 import org.easymock.EasyMock._
 import org.easymock.{Capture, CaptureType, EasyMock}
 import org.junit.{Before, Test}
@@ -381,7 +379,7 @@ class ReassignPartitionsCommandTest extends Logging {
              brokers: Seq[Int] = Seq[Int]()): ZkUtils = {
     val zk = createMock(classOf[ZkUtils])
     
expect(zk.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Seq[String]])).andStubReturn(existingAssignment)
-    expect(zk.getAllBrokersInCluster()).andStubReturn(brokers.map { id => new 
Broker(id, "", 1, SecurityProtocol.PLAINTEXT) })
+    
expect(zk.getAllBrokersInCluster()).andStubReturn(brokers.map(TestUtils.createBroker(_,
 "", 1)))
     replay(zk)
     zk
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index e93cae3..7806765 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -17,13 +17,15 @@
 
 package kafka.api
 
-import kafka.cluster.{Broker, EndPoint}
 import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError}
 import kafka.common._
 import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.common.TopicAndPartition
+import kafka.utils.TestUtils
+import TestUtils.createBroker
 import java.nio.ByteBuffer
 
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.utils.Time
 import org.junit._
@@ -74,9 +76,8 @@ object SerializationTestUtils {
     TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
   )
 
-  private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> 
EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))),
-                             new Broker(1, Map(SecurityProtocol.PLAINTEXT -> 
EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))),
-                             new Broker(2, Map(SecurityProtocol.PLAINTEXT -> 
EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT))))
+  private val brokers = List(createBroker(0, "localhost", 1011), 
createBroker(0, "localhost", 1012),
+    createBroker(0, "localhost", 1013))
 
   def createTestProducerRequest: ProducerRequest = {
     new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest)
@@ -88,13 +89,9 @@ object SerializationTestUtils {
       TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
     ), ProducerRequest.CurrentVersion, 100)
 
-  def createTestFetchRequest: FetchRequest = {
-    new FetchRequest(requestInfo = requestInfos.toVector)
-  }
+  def createTestFetchRequest: FetchRequest = new FetchRequest(requestInfo = 
requestInfos.toVector)
 
-  def createTestFetchResponse: FetchResponse = {
-    FetchResponse(1, topicDataFetchResponse.toVector)
-  }
+  def createTestFetchResponse: FetchResponse = FetchResponse(1, 
topicDataFetchResponse.toVector)
 
   def createTestOffsetRequest = new OffsetRequest(
       collection.immutable.Map(TopicAndPartition(topic1, 1) -> 
PartitionOffsetRequestInfo(1000, 200)),
@@ -156,12 +153,11 @@ object SerializationTestUtils {
     ))
   }
 
-  def createConsumerMetadataRequest: GroupCoordinatorRequest = {
-    GroupCoordinatorRequest("group 1", clientId = "client 1")
-  }
+  def createConsumerMetadataRequest: GroupCoordinatorRequest = 
GroupCoordinatorRequest("group 1", clientId = "client 1")
 
   def createConsumerMetadataResponse: GroupCoordinatorResponse = {
-    
GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
 Errors.NONE.code, 0)
+    GroupCoordinatorResponse(Some(
+      
brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
 Errors.NONE.code, 0)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala 
b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index 5554b39..20b7e25 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -19,7 +19,8 @@ package kafka.cluster
 
 import java.nio.ByteBuffer
 
-import kafka.utils.Logging
+import kafka.utils.{Logging, TestUtils}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Test
 
@@ -29,14 +30,10 @@ class BrokerEndPointTest extends Logging {
 
   @Test
   def testHashAndEquals() {
-    val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
-    val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT)
-    val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT)
-    val endpoint4 = new EndPoint("other", 1111, SecurityProtocol.PLAINTEXT)
-    val broker1 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint1))
-    val broker2 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint2))
-    val broker3 = new Broker(2, Map(SecurityProtocol.PLAINTEXT -> endpoint3))
-    val broker4 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint4))
+    val broker1 = TestUtils.createBroker(1, "myhost", 9092)
+    val broker2 = TestUtils.createBroker(1, "myhost", 9092)
+    val broker3 = TestUtils.createBroker(2, "myhost", 1111)
+    val broker4 = TestUtils.createBroker(1, "other", 1111)
 
     assert(broker1 == broker2)
     assert(broker1 != broker3)
@@ -64,31 +61,36 @@ class BrokerEndPointTest extends Logging {
     }"""
     val broker = Broker.createBroker(1, brokerInfoStr)
     assert(broker.id == 1)
-    assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).host == "localhost")
-    assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).port == 9093)
+    val brokerEndPoint = 
broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+    assert(brokerEndPoint.host == "localhost")
+    assert(brokerEndPoint.port == 9093)
   }
 
   @Test
   def testFromJsonV2 {
-    val brokerInfoStr = "{\"version\":2," +
-                          "\"host\":\"localhost\"," +
-                          "\"port\":9092," +
-                          "\"jmx_port\":9999," +
-                          "\"timestamp\":\"1416974968782\"," +
-                          "\"endpoints\":[\"PLAINTEXT://localhost:9092\"]}"
+    val brokerInfoStr = """{
+      "version":2,
+      "host":"localhost",
+      "port":9092,
+      "jmx_port":9999,
+      "timestamp":"1416974968782",
+      "endpoints":["PLAINTEXT://localhost:9092"]
+    }"""
     val broker = Broker.createBroker(1, brokerInfoStr)
     assert(broker.id == 1)
-    assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == 
"localhost")
-    assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9092)
+    val brokerEndPoint = 
broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+    assert(brokerEndPoint.host == "localhost")
+    assert(brokerEndPoint.port == 9092)
   }
 
   @Test
   def testFromJsonV1() = {
-    val brokerInfoStr = 
"{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}"
+    val brokerInfoStr = 
"""{"jmx_port":-1,"timestamp":"1420485325400","host":"172.16.8.243","version":1,"port":9091}"""
     val broker = Broker.createBroker(1, brokerInfoStr)
     assert(broker.id == 1)
-    assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == 
"172.16.8.243")
-    assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9091)
+    val brokerEndPoint = 
broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+    assert(brokerEndPoint.host == "172.16.8.243")
+    assert(brokerEndPoint.port == 9091)
   }
 
   @Test
@@ -122,37 +124,37 @@ class BrokerEndPointTest extends Logging {
   @Test
   def testEndpointFromUri() {
     var connectionString = "PLAINTEXT://localhost:9092"
-    var endpoint = EndPoint.createEndPoint(connectionString)
+    var endpoint = EndPoint.createEndPoint(connectionString, None)
     assert(endpoint.host == "localhost")
     assert(endpoint.port == 9092)
     assert(endpoint.connectionString == "PLAINTEXT://localhost:9092")
     // KAFKA-3719
     connectionString = "PLAINTEXT://local_host:9092"
-    endpoint = EndPoint.createEndPoint(connectionString)
+    endpoint = EndPoint.createEndPoint(connectionString, None)
     assert(endpoint.host == "local_host")
     assert(endpoint.port == 9092)
     assert(endpoint.connectionString == "PLAINTEXT://local_host:9092")
     // also test for default bind
     connectionString = "PLAINTEXT://:9092"
-    endpoint = EndPoint.createEndPoint(connectionString)
+    endpoint = EndPoint.createEndPoint(connectionString, None)
     assert(endpoint.host == null)
     assert(endpoint.port == 9092)
     assert(endpoint.connectionString == "PLAINTEXT://:9092")
     // also test for ipv6
     connectionString = "PLAINTEXT://[::1]:9092"
-    endpoint = EndPoint.createEndPoint(connectionString)
+    endpoint = EndPoint.createEndPoint(connectionString, None)
     assert(endpoint.host == "::1")
     assert(endpoint.port == 9092)
     assert(endpoint.connectionString ==  "PLAINTEXT://[::1]:9092")
     // test for ipv6 with % character
     connectionString = "PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092"
-    endpoint = EndPoint.createEndPoint(connectionString)
+    endpoint = EndPoint.createEndPoint(connectionString, None)
     assert(endpoint.host == "fe80::b1da:69ca:57f7:63d8%3")
     assert(endpoint.port == 9092)
     assert(endpoint.connectionString ==  
"PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092")
     // test hostname
     connectionString = "PLAINTEXT://MyHostname:9092"
-    endpoint = EndPoint.createEndPoint(connectionString)
+    endpoint = EndPoint.createEndPoint(connectionString, None)
     assert(endpoint.host == "MyHostname")
     assert(endpoint.port == 9092)
     assert(endpoint.connectionString ==  "PLAINTEXT://MyHostname:9092")

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala 
b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
index 24ed954..b402b25 100644
--- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala
@@ -22,7 +22,7 @@ import java.io.File
 import kafka.admin.AdminUtils
 import kafka.api.TopicMetadataResponse
 import kafka.client.ClientUtils
-import kafka.cluster.{Broker, BrokerEndPoint}
+import kafka.cluster.BrokerEndPoint
 import kafka.server.{KafkaConfig, KafkaServer, NotRunning}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
@@ -52,7 +52,7 @@ abstract class BaseTopicMetadataTest extends 
ZooKeeperTestHarness {
     brokerEndPoints = Seq(
       // We are using the Scala clients and they don't support SSL. Once we 
move to the Java ones, we should use
       // `securityProtocol` instead of PLAINTEXT below
-      new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, 
server1.boundPort(SecurityProtocol.PLAINTEXT))
+      new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, 
TestUtils.boundPort(server1))
     )
   }
 
@@ -69,7 +69,7 @@ abstract class BaseTopicMetadataTest extends 
ZooKeeperTestHarness {
     createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, 
servers = Seq(server1))
 
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), 
brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000,0).topicsMetadata
+      2000, 0).topicsMetadata
     assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
     assertEquals(Errors.NONE.code, 
topicsMetadata.head.partitionsMetadata.head.errorCode)
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
@@ -143,12 +143,12 @@ abstract class BaseTopicMetadataTest extends 
ZooKeeperTestHarness {
     // We are using the Scala clients and they don't support SSL. Once we move 
to the Java ones, we should use
     // `securityProtocol` instead of PLAINTEXT below
     val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, 
adHocServer.config.hostName,
-      adHocServer.boundPort(SecurityProtocol.PLAINTEXT))
+      TestUtils.boundPort(adHocServer))
 
     // auto create topic on "bad" endpoint
     val topic = "testAutoCreateTopic"
     val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), 
Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic",
-      2000,0).topicsMetadata
+      2000, 0).topicsMetadata
     assertEquals(Errors.INVALID_REPLICATION_FACTOR.code, 
topicsMetadata.head.errorCode)
     assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size)
     assertEquals("Expecting metadata for the test topic", topic, 
topicsMetadata.head.topic)
@@ -163,7 +163,7 @@ abstract class BaseTopicMetadataTest extends 
ZooKeeperTestHarness {
     val topic1 = "testAutoCreate_Topic"
     val topic2 = "testAutoCreate.Topic"
     var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), 
brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic",
-      2000,0).topicsMetadata
+      2000, 0).topicsMetadata
     assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size)
     assertEquals("Expecting metadata for topic1", topic1, 
topicsMetadata.head.topic)
     assertEquals(Errors.LEADER_NOT_AVAILABLE.code, 
topicsMetadata.head.errorCode)
@@ -176,7 +176,7 @@ abstract class BaseTopicMetadataTest extends 
ZooKeeperTestHarness {
 
     // retry the metadata for the first auto created topic
     topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), 
brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata",
-      2000,0).topicsMetadata
+      2000, 0).topicsMetadata
     assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode)
     assertEquals(Errors.NONE.code, 
topicsMetadata.head.partitionsMetadata.head.errorCode)
     var partitionMetadata = topicsMetadata.head.partitionsMetadata
@@ -188,24 +188,21 @@ abstract class BaseTopicMetadataTest extends 
ZooKeeperTestHarness {
 
   private def checkIsr(servers: Seq[KafkaServer]): Unit = {
     val activeBrokers: Seq[KafkaServer] = servers.filter(x => 
x.brokerState.currentState != NotRunning.state)
-    val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map(
-      x => new BrokerEndPoint(x.config.brokerId,
-                              if (x.config.hostName.nonEmpty) 
x.config.hostName else "localhost",
-                              x.boundPort())
-    )
+    val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map { x =>
+      new BrokerEndPoint(x.config.brokerId,
+        if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+        TestUtils.boundPort(x))
+    }
 
     // Assert that topic metadata at new brokers is updated correctly
     activeBrokers.foreach(x => {
       var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), 
Seq(), -1)
       waitUntilTrue(() => {
-        metadata = ClientUtils.fetchTopicMetadata(
-                                Set.empty,
-                                Seq(new BrokerEndPoint(
-                                                  x.config.brokerId,
-                                                  if 
(x.config.hostName.nonEmpty) x.config.hostName else "localhost",
-                                                  x.boundPort())),
-                                "TopicMetadataTest-testBasicTopicMetadata",
-                                2000, 0)
+        metadata = ClientUtils.fetchTopicMetadata(Set.empty,
+                                Seq(new BrokerEndPoint(x.config.brokerId,
+                                                       if 
(x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+                                                       
TestUtils.boundPort(x))),
+                                "TopicMetadataTest-testBasicTopicMetadata", 
2000, 0)
         metadata.topicsMetadata.nonEmpty &&
           metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
           expectedIsr.sortBy(_.id) == 
metadata.topicsMetadata.head.partitionsMetadata.head.isr.sortBy(_.id)
@@ -263,9 +260,7 @@ abstract class BaseTopicMetadataTest extends 
ZooKeeperTestHarness {
       waitUntilTrue(() => {
           val foundMetadata = ClientUtils.fetchTopicMetadata(
             Set.empty,
-            Seq(new Broker(x.config.brokerId,
-              x.config.hostName,
-              x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)),
+            Seq(new BrokerEndPoint(x.config.brokerId, x.config.hostName, 
TestUtils.boundPort(x))),
             "TopicMetadataTest-testBasicTopicMetadata", 2000, 0)
           topicMetadata.brokers.sortBy(_.id) == 
foundMetadata.brokers.sortBy(_.id) &&
             topicMetadata.topicsMetadata.sortBy(_.topic) == 
foundMetadata.topicsMetadata.sortBy(_.topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala 
b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
index 003c04c..3f59302 100644
--- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
@@ -44,7 +44,9 @@ class FetcherTest extends KafkaServerTestHarness {
     super.setUp
     TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 
-> Seq(configs.head.brokerId)), servers = servers)
 
-    val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, 
"localhost", s.boundPort())))
+    val cluster = new Cluster(servers.map { s =>
+      new Broker(s.config.brokerId, "localhost", boundPort(s), listenerName, 
securityProtocol)
+    })
 
     fetcher = new ConsumerFetcherManager("consumer1", new 
ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils)
     fetcher.stopConnections()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 4bbdedb..270fca2 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -19,6 +19,7 @@ package kafka.integration
 
 import java.io.File
 import java.util.Arrays
+
 import kafka.common.KafkaException
 import kafka.server._
 import kafka.utils.{CoreUtils, TestUtils}
@@ -26,9 +27,12 @@ import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.junit.{After, Before}
+
 import scala.collection.mutable.Buffer
 import java.util.Properties
 
+import org.apache.kafka.common.network.ListenerName
+
 /**
  * A test harness that brings up some number of broker nodes
  */
@@ -66,7 +70,10 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
 
   def serverForId(id: Int): Option[KafkaServer] = servers.find(s => 
s.config.brokerId == id)
 
+  def boundPort(server: KafkaServer): Int = server.boundPort(listenerName)
+
   protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
+  protected def listenerName: ListenerName = 
ListenerName.forSecurityProtocol(securityProtocol)
   protected def trustStoreFile: Option[File] = None
   protected def saslProperties: Option[Properties] = None
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
index bdf116f..e3115e1 100644
--- 
a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
+++ 
b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
@@ -36,7 +36,7 @@ trait ProducerConsumerTestHarness extends 
KafkaServerTestHarness {
       encoder = classOf[StringEncoder].getName,
       keyEncoder = classOf[StringEncoder].getName,
       partitioner = classOf[StaticPartitioner].getName)
-    consumer = new SimpleConsumer(host, servers.head.boundPort(), 1000000, 64 
* 1024, "")
+    consumer = new SimpleConsumer(host, TestUtils.boundPort(servers.head), 
1000000, 64 * 1024, "")
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 8a0ae1a..b8e3a8a 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -30,7 +30,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.NetworkSend
+import org.apache.kafka.common.network.{ListenerName, NetworkSend}
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
@@ -99,7 +99,7 @@ class SocketServerTest extends JUnitSuite {
   }
 
   def connect(s: SocketServer = server, protocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT) = {
-    val socket = new Socket("localhost", s.boundPort(protocol))
+    val socket = new Socket("localhost", 
s.boundPort(ListenerName.forSecurityProtocol(protocol)))
     sockets += socket
     socket
   }
@@ -280,7 +280,8 @@ class SocketServerTest extends JUnitSuite {
       val sslContext = SSLContext.getInstance("TLSv1.2")
       sslContext.init(null, Array(TestUtils.trustAllCerts), new 
java.security.SecureRandom())
       val socketFactory = sslContext.getSocketFactory
-      val sslSocket = socketFactory.createSocket("localhost", 
overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket]
+      val sslSocket = socketFactory.createSocket("localhost",
+        
overrideServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))).asInstanceOf[SSLSocket]
       sslSocket.setNeedClientAuth(false)
 
       val apiKey = ApiKeys.PRODUCE.id
@@ -324,9 +325,10 @@ class SocketServerTest extends JUnitSuite {
     val serverMetrics = new Metrics
     var conn: Socket = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), 
serverMetrics, Time.SYSTEM, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
protocol: SecurityProtocol): Processor = {
+      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
+                                protocol: SecurityProtocol): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
-          config.connectionsMaxIdleMs, protocol, config.values, metrics, 
credentialProvider) {
+          config.connectionsMaxIdleMs, listenerName, protocol, config.values, 
metrics, credentialProvider) {
           override protected[network] def sendResponse(response: 
RequestChannel.Response) {
             conn.close()
             super.sendResponse(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index f5943d6..7a00f2a 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -36,6 +36,7 @@ import kafka.utils.TestUtils._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import kafka.utils._
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.utils.Time
 
 @deprecated("This test has been deprecated and it will be removed in a future 
release.", "0.10.0.0")
@@ -49,7 +50,7 @@ class AsyncProducerTest {
   val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port = 65534))
   val configs = props.map(KafkaConfig.fromProps)
   val brokerList = configs.map { config =>
-    val endPoint = 
config.advertisedListeners.get(SecurityProtocol.PLAINTEXT).get
+    val endPoint = config.advertisedListeners.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).get
     org.apache.kafka.common.utils.Utils.formatAddress(endPoint.host, 
endPoint.port)
   }.mkString(",")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
index 769ea33..63ec83e 100755
--- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
@@ -53,13 +53,13 @@ class ProducerTest extends ZooKeeperTestHarness with 
Logging{
   // ports and then get a consumer instance that will be pointed at the 
correct port
   def getConsumer1() = {
     if (consumer1 == null)
-      consumer1 = new SimpleConsumer("localhost", server1.boundPort(), 
1000000, 64*1024, "")
+      consumer1 = new SimpleConsumer("localhost", 
TestUtils.boundPort(server1), 1000000, 64*1024, "")
     consumer1
   }
 
   def getConsumer2() = {
     if (consumer2 == null)
-      consumer2 = new SimpleConsumer("localhost", server2.boundPort(), 100, 
64*1024, "")
+      consumer2 = new SimpleConsumer("localhost", 
TestUtils.boundPort(server2), 100, 64*1024, "")
     consumer2
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala 
b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
index d63afe7..c20aab3 100644
--- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
@@ -27,7 +27,7 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.message._
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.Time
 import org.junit.Test
 import org.junit.Assert._
@@ -52,7 +52,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
   def testReachableServer() {
     val server = servers.head
 
-    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+    val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
@@ -88,7 +88,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
   @Test
   def testEmptyProduceRequest() {
     val server = servers.head
-    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+    val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
 
     val correlationId = 0
@@ -106,7 +106,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
   @Test
   def testMessageSizeTooLarge() {
     val server = servers.head
-    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+    val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     TestUtils.createTopic(zkUtils, "test", numPartitions = 1, 
replicationFactor = 1, servers = servers)
@@ -133,7 +133,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
   @Test
   def testMessageSizeTooLargeWithAckZero() {
     val server = servers.head
-    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+    val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
     props.put("request.required.acks", "0")
 
@@ -159,7 +159,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
   @Test
   def testProduceCorrectlyReceivesResponse() {
     val server = servers.head
-    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+    val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
     val producer = new SyncProducer(new SyncProducerConfig(props))
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new 
Message(messageBytes))
@@ -207,7 +207,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
     val timeoutMs = 500
 
     val server = servers.head
-    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+    val props = TestUtils.getSyncProducerConfig(boundPort(server))
     val producer = new SyncProducer(new SyncProducerConfig(props))
 
     val messages = new ByteBufferMessageSet(NoCompressionCodec, new 
Message(messageBytes))
@@ -233,7 +233,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
   def testProduceRequestWithNoResponse() {
     val server = servers.head
 
-    val port = server.socketServer.boundPort(SecurityProtocol.PLAINTEXT)
+    val port = TestUtils.boundPort(server)
     val props = TestUtils.getSyncProducerConfig(port)
     val correlationId = 0
     val clientId = SyncProducerConfig.DefaultClientId
@@ -249,7 +249,7 @@ class SyncProducerTest extends KafkaServerTestHarness {
   def testNotEnoughReplicas()  {
     val topicName = "minisrtest"
     val server = servers.head
-    val props = 
TestUtils.getSyncProducerConfig(server.socketServer.boundPort())
+    val props = TestUtils.getSyncProducerConfig(boundPort(server))
 
     props.put("request.required.acks", "-1")
 

Reply via email to