Repository: kafka
Updated Branches:
  refs/heads/trunk 80d6c64c1 -> d25671884


http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala 
b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
index dc17aa4..8d4899b 100755
--- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
@@ -18,41 +18,79 @@
 package kafka.server
 
 import org.junit.Assert._
-import kafka.utils.{TestUtils, CoreUtils, ZkUtils}
+import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{Test, After, Before}
+import org.junit.{After, Test}
+
+import scala.collection.mutable.ArrayBuffer
 
 class AdvertiseBrokerTest extends ZooKeeperTestHarness {
-  var server : KafkaServer = null
+  val servers = ArrayBuffer[KafkaServer]()
+
   val brokerId = 0
-  val advertisedHostName = "routable-host"
-  val advertisedPort = 1234
 
-  @Before
-  override def setUp() {
-    super.setUp()
+  @After
+  override def tearDown() {
+    servers.foreach { s =>
+      s.shutdown()
+      CoreUtils.delete(s.config.logDirs)
+    }
+    super.tearDown()
+  }
 
+  @Test
+  def testBrokerAdvertiseHostNameAndPortToZK: Unit = {
+    val advertisedHostName = "routable-host1"
+    val advertisedPort = 1234
     val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
     props.put("advertised.host.name", advertisedHostName)
     props.put("advertised.port", advertisedPort.toString)
+    servers += TestUtils.createServer(KafkaConfig.fromProps(props))
 
-    server = TestUtils.createServer(KafkaConfig.fromProps(props))
+    val brokerInfo = zkUtils.getBrokerInfo(brokerId).get
+    assertEquals(1, brokerInfo.endPoints.size)
+    val endpoint = brokerInfo.endPoints.head
+    assertEquals(advertisedHostName, endpoint.host)
+    assertEquals(advertisedPort, endpoint.port)
+    assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol)
+    assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName.value)
   }
 
-  @After
-  override def tearDown() {
-    server.shutdown()
-    CoreUtils.delete(server.config.logDirs)
-    super.tearDown()
+  def testBrokerAdvertiseListenersToZK: Unit = {
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+    props.put("advertised.listeners", "PLAINTEXT://routable-listener:3334")
+    servers += TestUtils.createServer(KafkaConfig.fromProps(props))
+
+    val brokerInfo = zkUtils.getBrokerInfo(brokerId).get
+    assertEquals(1, brokerInfo.endPoints.size)
+    val endpoint = brokerInfo.endPoints.head
+    assertEquals("routable-listener", endpoint.host)
+    assertEquals(3334, endpoint.port)
+    assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol)
+    assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName)
   }
 
-  @Test
-  def testBrokerAdvertiseToZK {
-    val brokerInfo = zkUtils.getBrokerInfo(brokerId)
-    val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get
-    assertEquals(advertisedHostName, endpoint.host)
-    assertEquals(advertisedPort, endpoint.port)
+  def testBrokerAdvertiseListenersWithCustomNamesToZK: Unit = {
+    val props = TestUtils.createBrokerConfig(brokerId, zkConnect)
+    props.put("listeners", "INTERNAL://:0,EXTERNAL://:0")
+    props.put("advertised.listeners", 
"EXTERNAL://external-listener:9999,INTERNAL://internal-listener:10999")
+    props.put("listener.security.protocol.map", 
"INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT")
+    props.put("inter.broker.listener.name", "INTERNAL")
+    servers += TestUtils.createServer(KafkaConfig.fromProps(props))
+
+    val brokerInfo = zkUtils.getBrokerInfo(brokerId).get
+    assertEquals(1, brokerInfo.endPoints.size)
+    val endpoint = brokerInfo.endPoints.head
+    assertEquals("external-listener", endpoint.host)
+    assertEquals(9999, endpoint.port)
+    assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol)
+    assertEquals("EXTERNAL", endpoint.listenerName.value)
+    val endpoint2 = brokerInfo.endPoints(1)
+    assertEquals("internal-listener", endpoint2.host)
+    assertEquals(10999, endpoint2.port)
+    assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol)
+    assertEquals("INTERNAL", endpoint2.listenerName)
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index 5c29935..6d3374f 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -25,6 +25,7 @@ import java.util.Properties
 import kafka.integration.KafkaServerTestHarness
 import kafka.network.SocketServer
 import kafka.utils._
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, SecurityProtocol}
 import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, 
ResponseHeader}
 import org.junit.Before
@@ -73,7 +74,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness 
{
   }
 
   def connect(s: SocketServer = anySocketServer, protocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT): Socket = {
-    new Socket("localhost", s.boundPort(protocol))
+    new Socket("localhost", 
s.boundPort(ListenerName.forSecurityProtocol(protocol)))
   }
 
   private def sendRequest(socket: Socket, request: Array[Byte]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
 
b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
index d3dbfe2..47a05ef 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala
@@ -40,7 +40,7 @@ class ControlledShutdownLeaderSelectorTest {
 
     val zkUtils = EasyMock.mock(classOf[ZkUtils])
     val controllerContext = new ControllerContext(zkUtils)
-    controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, 
None)).toSet
+    controllerContext.liveBrokers = assignment.map(Broker(_, Seq.empty, 
None)).toSet
     controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3)
     controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition 
-> assignment)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
index 2857fc7..5c53ffa 100755
--- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -25,6 +25,7 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.network.SocketServer
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.types.Type
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.record.MemoryRecords
@@ -45,7 +46,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness {
   private def socketServer = servers.head.socketServer
 
   private def connect(s: SocketServer = socketServer, protocol: 
SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
-    new Socket("localhost", s.boundPort(protocol))
+    new Socket("localhost", 
s.boundPort(ListenerName.forSecurityProtocol(protocol)))
   }
 
   private def sendRequest(socket: Socket, request: Array[Byte], id: 
Option[Short] = None) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d5e2ce3..c3fc600 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -20,9 +20,11 @@ package kafka.server
 import java.util.Properties
 
 import kafka.api.{ApiVersion, KAFKA_0_8_2}
+import kafka.cluster.EndPoint
 import kafka.message._
 import kafka.utils.{CoreUtils, TestUtils}
 import org.apache.kafka.common.config.ConfigException
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Assert._
 import org.junit.Test
@@ -84,6 +86,7 @@ class KafkaConfigTest {
     val cfg = KafkaConfig.fromProps(props)
     assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
   }
+
   @Test
   def testLogRetentionUnlimited() {
     val props1 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port 
= 8181)
@@ -149,7 +152,7 @@ class KafkaConfigTest {
     props.put(KafkaConfig.PortProp, port)
     val serverConfig = KafkaConfig.fromProps(props)
     val endpoints = serverConfig.advertisedListeners
-    val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
+    val endpoint = endpoints.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).get
     assertEquals(endpoint.host, hostName)
     assertEquals(endpoint.port, port.toInt)
   }
@@ -165,7 +168,7 @@ class KafkaConfigTest {
 
     val serverConfig = KafkaConfig.fromProps(props)
     val endpoints = serverConfig.advertisedListeners
-    val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
+    val endpoint = endpoints.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).get
 
     assertEquals(endpoint.host, advertisedHostName)
     assertEquals(endpoint.port, advertisedPort.toInt)
@@ -182,7 +185,7 @@ class KafkaConfigTest {
 
     val serverConfig = KafkaConfig.fromProps(props)
     val endpoints = serverConfig.advertisedListeners
-    val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
+    val endpoint = endpoints.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).get
 
     assertEquals(endpoint.host, advertisedHostName)
     assertEquals(endpoint.port, port.toInt)
@@ -199,7 +202,7 @@ class KafkaConfigTest {
 
     val serverConfig = KafkaConfig.fromProps(props)
     val endpoints = serverConfig.advertisedListeners
-    val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
+    val endpoint = endpoints.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).get
 
     assertEquals(endpoint.host, hostName)
     assertEquals(endpoint.port, advertisedPort.toInt)
@@ -213,15 +216,15 @@ class KafkaConfigTest {
 
     // listeners with duplicate port
     props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9091,TRACE://localhost:9091")
-    assert(!isValidKafkaConfig(props))
+    assertFalse(isValidKafkaConfig(props))
 
     // listeners with duplicate protocol
     props.put(KafkaConfig.ListenersProp, 
"PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
-    assert(!isValidKafkaConfig(props))
+    assertFalse(isValidKafkaConfig(props))
 
     // advertised listeners with duplicate port
     props.put(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://localhost:9091,TRACE://localhost:9091")
-    assert(!isValidKafkaConfig(props))
+    assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
@@ -231,7 +234,96 @@ class KafkaConfigTest {
     props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
     props.put(KafkaConfig.ListenersProp, "BAD://localhost:9091")
 
-    assert(!isValidKafkaConfig(props))
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testListenerNamesWithAdvertisedListenerUnset(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, "1")
+    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+
+    props.put(KafkaConfig.ListenersProp, 
"CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093")
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT")
+    props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+    val config = KafkaConfig.fromProps(props)
+    val expectedListeners = Seq(
+      EndPoint("localhost", 9091, new ListenerName("CLIENT"), 
SecurityProtocol.SSL),
+      EndPoint("localhost", 9092, new ListenerName("REPLICATION"), 
SecurityProtocol.SSL),
+      EndPoint("localhost", 9093, new ListenerName("INTERNAL"), 
SecurityProtocol.PLAINTEXT))
+    assertEquals(expectedListeners, config.listeners)
+    assertEquals(expectedListeners, config.advertisedListeners)
+    val expectedSecurityProtocolMap = Map(
+      new ListenerName("CLIENT") -> SecurityProtocol.SSL,
+      new ListenerName("REPLICATION") -> SecurityProtocol.SSL,
+      new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
+    )
+    assertEquals(expectedSecurityProtocolMap, 
config.listenerSecurityProtocolMap)
+  }
+
+  @Test
+  def testListenerAndAdvertisedListenerNames(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, "1")
+    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+
+    props.put(KafkaConfig.ListenersProp, 
"EXTERNAL://localhost:9091,INTERNAL://localhost:9093")
+    props.put(KafkaConfig.AdvertisedListenersProp, 
"EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093")
+    props.put(KafkaConfig.ListenerSecurityProtocolMapProp, 
"EXTERNAL:SSL,INTERNAL:PLAINTEXT")
+    props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
+    val config = KafkaConfig.fromProps(props)
+
+    val expectedListeners = Seq(
+      EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), 
SecurityProtocol.SSL),
+      EndPoint("localhost", 9093, new ListenerName("INTERNAL"), 
SecurityProtocol.PLAINTEXT)
+    )
+    assertEquals(expectedListeners, config.listeners)
+
+    val expectedAdvertisedListeners = Seq(
+      EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), 
SecurityProtocol.SSL),
+      EndPoint("host1", 9093, new ListenerName("INTERNAL"), 
SecurityProtocol.PLAINTEXT)
+    )
+    assertEquals(expectedAdvertisedListeners, config.advertisedListeners)
+
+    val expectedSecurityProtocolMap = Map(
+      new ListenerName("EXTERNAL") -> SecurityProtocol.SSL,
+      new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT
+    )
+    assertEquals(expectedSecurityProtocolMap, 
config.listenerSecurityProtocolMap)
+  }
+
+  @Test
+  def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, "1")
+    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+
+    props.put(KafkaConfig.ListenersProp, 
"SSL://localhost:9091,REPLICATION://localhost:9092")
+    props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): 
Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, "1")
+    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+
+    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091")
+    props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.BrokerIdProp, "1")
+    props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
+
+    props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091")
+    props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL")
+    props.put(KafkaConfig.InterBrokerSecurityProtocolProp, "SSL")
+    assertFalse(isValidKafkaConfig(props))
   }
 
   @Test
@@ -240,10 +332,15 @@ class KafkaConfigTest {
     props.put(KafkaConfig.BrokerIdProp, "1")
     props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
     props.put(KafkaConfig.ListenersProp, 
"plaintext://localhost:9091,SsL://localhost:9092")
-
-    assert(isValidKafkaConfig(props))
+    val config = KafkaConfig.fromProps(props)
+    assertEquals(Some("SSL://localhost:9092"), 
config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString))
+    assertEquals(Some("PLAINTEXT://localhost:9091"), 
config.listeners.find(_.listenerName.value == 
"PLAINTEXT").map(_.connectionString))
   }
 
+  def listenerListToEndPoints(listenerList: String,
+                              securityProtocolMap: 
collection.Map[ListenerName, SecurityProtocol] = 
EndPoint.DefaultSecurityProtocolMap) =
+    CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap)
+
   @Test
   def testListenerDefaults() {
     val props = new Properties()
@@ -255,22 +352,22 @@ class KafkaConfigTest {
     props.put(KafkaConfig.PortProp, "1111")
 
     val conf = KafkaConfig.fromProps(props)
-    assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://myhost:1111"), 
conf.listeners)
+    assertEquals(listenerListToEndPoints("PLAINTEXT://myhost:1111"), 
conf.listeners)
 
     // configuration with null host
     props.remove(KafkaConfig.HostNameProp)
 
     val conf2 = KafkaConfig.fromProps(props)
-    assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), 
conf2.listeners)
-    assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), 
conf2.advertisedListeners)
-    assertEquals(null, conf2.listeners(SecurityProtocol.PLAINTEXT).host)
+    assertEquals(listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners)
+    assertEquals(listenerListToEndPoints("PLAINTEXT://:1111"), 
conf2.advertisedListeners)
+    assertEquals(null, conf2.listeners.find(_.securityProtocol == 
SecurityProtocol.PLAINTEXT).get.host)
 
     // configuration with advertised host and port, and no advertised listeners
     props.put(KafkaConfig.AdvertisedHostNameProp, "otherhost")
     props.put(KafkaConfig.AdvertisedPortProp, "2222")
 
     val conf3 = KafkaConfig.fromProps(props)
-    assertEquals(conf3.advertisedListeners, 
CoreUtils.listenerListToEndPoints("PLAINTEXT://otherhost:2222"))
+    assertEquals(conf3.advertisedListeners, 
listenerListToEndPoints("PLAINTEXT://otherhost:2222"))
   }
 
   @Test
@@ -295,7 +392,7 @@ class KafkaConfigTest {
     assertEquals(KAFKA_0_8_2, conf3.interBrokerProtocolVersion)
 
     //check that latest is newer than 0.8.2
-    assert(ApiVersion.latestVersion >= conf3.interBrokerProtocolVersion)
+    assertTrue(ApiVersion.latestVersion >= conf3.interBrokerProtocolVersion)
   }
 
   private def isValidKafkaConfig(props: Properties): Boolean = {
@@ -303,7 +400,7 @@ class KafkaConfigTest {
       KafkaConfig.fromProps(props)
       true
     } catch {
-      case _: IllegalArgumentException => false
+      case _: IllegalArgumentException | _: ConfigException => false
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 7e85c19..0ceb71b 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -29,6 +29,7 @@ import kafka.controller.{ControllerChannelManager, 
ControllerContext}
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.utils.Time
 import org.junit.{After, Before, Test}
@@ -128,8 +129,11 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val controllerId = 2
 
     val controllerConfig = 
KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect))
-    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", 
s.boundPort()))
-    val nodes = brokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+    val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", 
TestUtils.boundPort(s), listenerName,
+      securityProtocol))
+    val nodes = brokers.map(_.getNode(listenerName))
 
     val controllerContext = new ControllerContext(zkUtils)
     controllerContext.liveBrokers = brokers.toSet

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 135e04b..f056476 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -54,7 +54,7 @@ class LogOffsetTest extends ZooKeeperTestHarness {
     logDir = new File(logDirPath)
     time = new MockTime()
     server = TestUtils.createServer(KafkaConfig.fromProps(config), time)
-    simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 
1000000, 64*1024, "")
+    simpleConsumer = new SimpleConsumer("localhost", 
TestUtils.boundPort(server), 1000000, 64*1024, "")
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 43ff785..99a95ad 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -21,6 +21,7 @@ import util.Arrays.asList
 
 import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, 
EndPoint}
@@ -37,7 +38,7 @@ class MetadataCacheTest {
   def getTopicMetadataNonExistingTopics() {
     val topic = "topic"
     val cache = new MetadataCache(1)
-    val topicMetadata = cache.getTopicMetadata(Set(topic), 
SecurityProtocol.PLAINTEXT)
+    val topicMetadata = cache.getTopicMetadata(Set(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
     assertTrue(topicMetadata.isEmpty)
   }
 
@@ -51,16 +52,16 @@ class MetadataCacheTest {
     val controllerId = 2
     val controllerEpoch = 1
 
-    def securityProtocolToEndPoint(brokerId: Int): Map[SecurityProtocol, 
EndPoint] = {
+    def endPoints(brokerId: Int): Seq[EndPoint] = {
       val host = s"foo-$brokerId"
-      Map(
-        SecurityProtocol.PLAINTEXT -> new EndPoint(host, 9092),
-        SecurityProtocol.SSL -> new EndPoint(host, 9093)
+      Seq(
+        new EndPoint(host, 9092, SecurityProtocol.PLAINTEXT, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
+        new EndPoint(host, 9093, SecurityProtocol.SSL, 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
       )
     }
 
     val brokers = (0 to 2).map { brokerId =>
-      new Broker(brokerId, securityProtocolToEndPoint(brokerId).asJava, 
"rack1")
+      new Broker(brokerId, endPoints(brokerId).asJava, "rack1")
     }.toSet
 
     val partitionStates = Map(
@@ -73,7 +74,8 @@ class MetadataCacheTest {
     cache.updateCache(15, updateMetadataRequest)
 
     for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, 
SecurityProtocol.SSL)) {
-      val topicMetadatas = cache.getTopicMetadata(Set(topic), securityProtocol)
+      val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+      val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
       assertEquals(1, topicMetadatas.size)
 
       val topicMetadata = topicMetadatas.head
@@ -89,7 +91,7 @@ class MetadataCacheTest {
         assertEquals(i, partitionMetadata.partition)
         val leader = partitionMetadata.leader
         assertEquals(i, leader.id)
-        val endPoint = 
securityProtocolToEndPoint(partitionMetadata.leader.id)(securityProtocol)
+        val endPoint = 
endPoints(partitionMetadata.leader.id).find(_.listenerName == listenerName).get
         assertEquals(endPoint.host, leader.host)
         assertEquals(endPoint.port, leader.port)
         assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id))
@@ -109,7 +111,9 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new 
EndPoint("foo", 9092)).asJava, null))
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+    val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, 
securityProtocol, listenerName)).asJava, null))
 
     val leader = 1
     val leaderEpoch = 1
@@ -120,7 +124,7 @@ class MetadataCacheTest {
       controllerId, controllerEpoch, partitionStates.asJava, 
brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
 
-    val topicMetadatas = cache.getTopicMetadata(Set(topic), 
SecurityProtocol.PLAINTEXT)
+    val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName)
     assertEquals(1, topicMetadatas.size)
 
     val topicMetadata = topicMetadatas.head
@@ -146,7 +150,9 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new 
EndPoint("foo", 9092)).asJava, null))
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+    val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, 
securityProtocol, listenerName)).asJava, null))
 
     // replica 1 is not available
     val leader = 0
@@ -162,7 +168,7 @@ class MetadataCacheTest {
     cache.updateCache(15, updateMetadataRequest)
 
     // Validate errorUnavailableEndpoints = false
-    val topicMetadatas = cache.getTopicMetadata(Set(topic), 
SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false)
+    val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableEndpoints = false)
     assertEquals(1, topicMetadatas.size)
 
     val topicMetadata = topicMetadatas.head
@@ -178,7 +184,7 @@ class MetadataCacheTest {
     assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet)
 
     // Validate errorUnavailableEndpoints = true
-    val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), 
SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true)
+    val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), 
listenerName, errorUnavailableEndpoints = true)
     assertEquals(1, topicMetadatasWithError.size)
 
     val topicMetadataWithError = topicMetadatasWithError.head
@@ -203,7 +209,9 @@ class MetadataCacheTest {
     val zkVersion = 3
     val controllerId = 2
     val controllerEpoch = 1
-    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new 
EndPoint("foo", 9092)).asJava, "rack1"))
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+    val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, 
securityProtocol, listenerName)).asJava, "rack1"))
 
     // replica 1 is not available
     val leader = 0
@@ -219,7 +227,7 @@ class MetadataCacheTest {
     cache.updateCache(15, updateMetadataRequest)
 
     // Validate errorUnavailableEndpoints = false
-    val topicMetadatas = cache.getTopicMetadata(Set(topic), 
SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false)
+    val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableEndpoints = false)
     assertEquals(1, topicMetadatas.size)
 
     val topicMetadata = topicMetadatas.head
@@ -235,7 +243,7 @@ class MetadataCacheTest {
     assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet)
 
     // Validate errorUnavailableEndpoints = true
-    val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), 
SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true)
+    val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), 
listenerName, errorUnavailableEndpoints = true)
     assertEquals(1, topicMetadatasWithError.size)
 
     val topicMetadataWithError = topicMetadatasWithError.head
@@ -255,7 +263,9 @@ class MetadataCacheTest {
   def getTopicMetadataWithNonSupportedSecurityProtocol() {
     val topic = "topic"
     val cache = new MetadataCache(1)
-    val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new 
EndPoint("foo", 9092)).asJava, ""))
+    val securityProtocol = SecurityProtocol.PLAINTEXT
+    val brokers = Set(new Broker(0,
+      Seq(new EndPoint("foo", 9092, securityProtocol, 
ListenerName.forSecurityProtocol(securityProtocol))).asJava, ""))
     val controllerEpoch = 1
     val leader = 0
     val leaderEpoch = 0
@@ -268,7 +278,7 @@ class MetadataCacheTest {
     cache.updateCache(15, updateMetadataRequest)
 
     try {
-      val result = cache.getTopicMetadata(Set(topic), SecurityProtocol.SSL)
+      val result = cache.getTopicMetadata(Set(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
       fail(s"Exception should be thrown by `getTopicMetadata` with 
non-supported SecurityProtocol, $result was returned instead")
     }
     catch {
@@ -284,7 +294,9 @@ class MetadataCacheTest {
 
     def updateCache(brokerIds: Set[Int]) {
       val brokers = brokerIds.map { brokerId =>
-        new Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new 
EndPoint("foo", 9092)).asJava, "")
+        val securityProtocol = SecurityProtocol.PLAINTEXT
+        new Broker(brokerId, Seq(
+          new EndPoint("foo", 9092, securityProtocol, 
ListenerName.forSecurityProtocol(securityProtocol))).asJava, "")
       }
       val controllerEpoch = 1
       val leader = 0

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index e46c41f..aef29bc 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -52,7 +52,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     val logDirPath = config.getProperty("log.dir")
     logDir = new File(logDirPath)
     server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM)
-    simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 
1000000, 64*1024, "test-client")
+    simpleConsumer = new SimpleConsumer("localhost", 
TestUtils.boundPort(server), 1000000, 64*1024, "test-client")
     val consumerMetadataRequest = GroupCoordinatorRequest(group)
     Stream.continually {
       val consumerMetadataResponse = 
simpleConsumer.send(consumerMetadataRequest)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index bcf0a9c..00959f1 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -21,8 +21,8 @@ import java.io.File
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
 import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
+import TestUtils.createBroker
 import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -138,7 +138,7 @@ class ReplicaManagerTest {
         fetchCallbackFired = true
       }
 
-      val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 
1))
+      val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, 
"host1", 1))
       val metadataCache = EasyMock.createMock(classOf[MetadataCache])
       
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
       EasyMock.replay(metadataCache)
@@ -196,7 +196,7 @@ class ReplicaManagerTest {
     val rm = new ReplicaManager(config, metrics, time, zkUtils, new 
MockScheduler(time), mockLogMgr,
       new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, 
time).follower, Option(this.getClass.getName))
     try {
-      val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 
1), new Broker(1, "host2", 2))
+      val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, 
"host1", 1), createBroker(1, "host2", 2))
       val metadataCache = EasyMock.createMock(classOf[MetadataCache])
       
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
       EasyMock.replay(metadataCache)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index fd0a460..66845c1 100755
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -81,7 +81,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0)
 
     producer = createProducer(server)
-    val consumer = new SimpleConsumer(host, server.boundPort(), 1000000, 
64*1024, "")
+    val consumer = new SimpleConsumer(host, TestUtils.boundPort(server), 
1000000, 64*1024, "")
 
     var fetchedMessage: ByteBufferMessageSet = null
     while (fetchedMessage == null || fetchedMessage.validBytes == 0) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 92c6a9b..ac757d0 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -47,7 +47,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
     val brokerId1 = 0
     val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect)
     val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1))
-    val port = server1.boundPort()
+    val port = TestUtils.boundPort(server1)
 
     // Create a second broker with same port
     val brokerId2 = 1

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala 
b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
index 5f466c5..fda17c0 100644
--- a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala
@@ -54,7 +54,7 @@ class SessionExpireListenerTest {
     val zkClient = EasyMock.mock(classOf[ZkClient])
     val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
     import Watcher._
-    val healthcheck = new KafkaHealthcheck(brokerId, Map.empty, zkUtils, None, 
ApiVersion.latestVersion)
+    val healthcheck = new KafkaHealthcheck(brokerId, Seq.empty, zkUtils, None, 
ApiVersion.latestVersion)
 
     val expiresPerSecName = "ZooKeeperExpiresPerSec"
     val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec"

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 24ec1c2..c530e07 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, 
ProducerConfig, Produce
 import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor}
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.network.Mode
+import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
 import org.apache.kafka.common.utils.Time
@@ -125,6 +125,12 @@ object TestUtils extends Logging {
     server
   }
 
+  def boundPort(server: KafkaServer, securityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT): Int =
+    server.boundPort(ListenerName.forSecurityProtocol(securityProtocol))
+
+  def createBroker(id: Int, host: String, port: Int, securityProtocol: 
SecurityProtocol = SecurityProtocol.PLAINTEXT): Broker =
+    new Broker(id, host, port, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol)
+
   /**
    * Create a test config for the provided parameters.
    *
@@ -150,7 +156,19 @@ object TestUtils extends Logging {
   }
 
   def getBrokerListStrFromServers(servers: Seq[KafkaServer], protocol: 
SecurityProtocol = SecurityProtocol.PLAINTEXT): String = {
-    servers.map(s => formatAddress(s.config.hostName, 
s.boundPort(protocol))).mkString(",")
+    servers.map { s =>
+      val listener = s.config.advertisedListeners.find(_.securityProtocol == 
protocol).getOrElse(
+        sys.error(s"Could not find listener with security protocol $protocol"))
+      formatAddress(listener.host, boundPort(s, protocol))
+    }.mkString(",")
+  }
+
+  def bootstrapServers(servers: Seq[KafkaServer], listenerName: ListenerName): 
String = {
+    servers.map { s =>
+      val listener = s.config.advertisedListeners.find(_.listenerName == 
listenerName).getOrElse(
+        sys.error(s"Could not find listener with name $listenerName"))
+      formatAddress(listener.host, s.boundPort(listenerName))
+    }.mkString(",")
   }
 
   /**
@@ -596,7 +614,8 @@ object TestUtils extends Logging {
   def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata], 
zkUtils: ZkUtils): Seq[Broker] = {
     val brokers = brokerMetadatas.map { b =>
       val protocol = SecurityProtocol.PLAINTEXT
-      Broker(b.id, Map(protocol -> EndPoint("localhost", 6667, 
protocol)).toMap, b.rack)
+      val listenerName = ListenerName.forSecurityProtocol(protocol)
+      Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), 
b.rack)
     }
     brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, 
b.endPoints, jmxPort = -1,
       rack = b.rack, ApiVersion.latestVersion))
@@ -604,7 +623,7 @@ object TestUtils extends Logging {
   }
 
   def deleteBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = {
-    val brokers = ids.map(id => new Broker(id, "localhost", 6667, 
SecurityProtocol.PLAINTEXT))
+    val brokers = ids.map(createBroker(_, "localhost", 6667, 
SecurityProtocol.PLAINTEXT))
     brokers.foreach(b => zkUtils.deletePath(ZkUtils.BrokerIdsPath + "/" + b))
     brokers
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
index ac9b670..70c5063 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
@@ -29,6 +29,7 @@ import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
@@ -110,7 +111,7 @@ public class KafkaEmbedded {
      * You can use this to tell Kafka producers and consumers how to connect 
to this instance.
      */
     public String brokerList() {
-        return kafka.config().hostName() + ":" + 
kafka.boundPort(SecurityProtocol.PLAINTEXT);
+        return kafka.config().hostName() + ":" + 
kafka.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT));
     }
 
 

Reply via email to