Repository: kafka Updated Branches: refs/heads/trunk 80d6c64c1 -> d25671884
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala index dc17aa4..8d4899b 100755 --- a/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala @@ -18,41 +18,79 @@ package kafka.server import org.junit.Assert._ -import kafka.utils.{TestUtils, CoreUtils, ZkUtils} +import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.{Test, After, Before} +import org.junit.{After, Test} + +import scala.collection.mutable.ArrayBuffer class AdvertiseBrokerTest extends ZooKeeperTestHarness { - var server : KafkaServer = null + val servers = ArrayBuffer[KafkaServer]() + val brokerId = 0 - val advertisedHostName = "routable-host" - val advertisedPort = 1234 - @Before - override def setUp() { - super.setUp() + @After + override def tearDown() { + servers.foreach { s => + s.shutdown() + CoreUtils.delete(s.config.logDirs) + } + super.tearDown() + } + @Test + def testBrokerAdvertiseHostNameAndPortToZK: Unit = { + val advertisedHostName = "routable-host1" + val advertisedPort = 1234 val props = TestUtils.createBrokerConfig(brokerId, zkConnect) props.put("advertised.host.name", advertisedHostName) props.put("advertised.port", advertisedPort.toString) + servers += TestUtils.createServer(KafkaConfig.fromProps(props)) - server = TestUtils.createServer(KafkaConfig.fromProps(props)) + val brokerInfo = zkUtils.getBrokerInfo(brokerId).get + assertEquals(1, brokerInfo.endPoints.size) + val endpoint = brokerInfo.endPoints.head + assertEquals(advertisedHostName, endpoint.host) + assertEquals(advertisedPort, endpoint.port) + assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol) + assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName.value) } - @After - override def tearDown() { - server.shutdown() - CoreUtils.delete(server.config.logDirs) - super.tearDown() + def testBrokerAdvertiseListenersToZK: Unit = { + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) + props.put("advertised.listeners", "PLAINTEXT://routable-listener:3334") + servers += TestUtils.createServer(KafkaConfig.fromProps(props)) + + val brokerInfo = zkUtils.getBrokerInfo(brokerId).get + assertEquals(1, brokerInfo.endPoints.size) + val endpoint = brokerInfo.endPoints.head + assertEquals("routable-listener", endpoint.host) + assertEquals(3334, endpoint.port) + assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol) + assertEquals(SecurityProtocol.PLAINTEXT.name, endpoint.listenerName) } - @Test - def testBrokerAdvertiseToZK { - val brokerInfo = zkUtils.getBrokerInfo(brokerId) - val endpoint = brokerInfo.get.endPoints.get(SecurityProtocol.PLAINTEXT).get - assertEquals(advertisedHostName, endpoint.host) - assertEquals(advertisedPort, endpoint.port) + def testBrokerAdvertiseListenersWithCustomNamesToZK: Unit = { + val props = TestUtils.createBrokerConfig(brokerId, zkConnect) + props.put("listeners", "INTERNAL://:0,EXTERNAL://:0") + props.put("advertised.listeners", "EXTERNAL://external-listener:9999,INTERNAL://internal-listener:10999") + props.put("listener.security.protocol.map", "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT") + props.put("inter.broker.listener.name", "INTERNAL") + servers += TestUtils.createServer(KafkaConfig.fromProps(props)) + + val brokerInfo = zkUtils.getBrokerInfo(brokerId).get + assertEquals(1, brokerInfo.endPoints.size) + val endpoint = brokerInfo.endPoints.head + assertEquals("external-listener", endpoint.host) + assertEquals(9999, endpoint.port) + assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol) + assertEquals("EXTERNAL", endpoint.listenerName.value) + val endpoint2 = brokerInfo.endPoints(1) + assertEquals("internal-listener", endpoint2.host) + assertEquals(10999, endpoint2.port) + assertEquals(SecurityProtocol.PLAINTEXT, endpoint.securityProtocol) + assertEquals("INTERNAL", endpoint2.listenerName) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 5c29935..6d3374f 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -25,6 +25,7 @@ import java.util.Properties import kafka.integration.KafkaServerTestHarness import kafka.network.SocketServer import kafka.utils._ +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, SecurityProtocol} import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, ResponseHeader} import org.junit.Before @@ -73,7 +74,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { } def connect(s: SocketServer = anySocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = { - new Socket("localhost", s.boundPort(protocol)) + new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol))) } private def sendRequest(socket: Socket, request: Array[Byte]) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala index d3dbfe2..47a05ef 100644 --- a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala @@ -40,7 +40,7 @@ class ControlledShutdownLeaderSelectorTest { val zkUtils = EasyMock.mock(classOf[ZkUtils]) val controllerContext = new ControllerContext(zkUtils) - controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet + controllerContext.liveBrokers = assignment.map(Broker(_, Seq.empty, None)).toSet controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3) controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala index 2857fc7..5c53ffa 100755 --- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala @@ -25,6 +25,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.network.SocketServer import kafka.utils._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.types.Type import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.record.MemoryRecords @@ -45,7 +46,7 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { private def socketServer = servers.head.socketServer private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = { - new Socket("localhost", s.boundPort(protocol)) + new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol))) } private def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index d5e2ce3..c3fc600 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -20,9 +20,11 @@ package kafka.server import java.util.Properties import kafka.api.{ApiVersion, KAFKA_0_8_2} +import kafka.cluster.EndPoint import kafka.message._ import kafka.utils.{CoreUtils, TestUtils} import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Assert._ import org.junit.Test @@ -84,6 +86,7 @@ class KafkaConfigTest { val cfg = KafkaConfig.fromProps(props) assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis) } + @Test def testLogRetentionUnlimited() { val props1 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181) @@ -149,7 +152,7 @@ class KafkaConfigTest { props.put(KafkaConfig.PortProp, port) val serverConfig = KafkaConfig.fromProps(props) val endpoints = serverConfig.advertisedListeners - val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get + val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get assertEquals(endpoint.host, hostName) assertEquals(endpoint.port, port.toInt) } @@ -165,7 +168,7 @@ class KafkaConfigTest { val serverConfig = KafkaConfig.fromProps(props) val endpoints = serverConfig.advertisedListeners - val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get + val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get assertEquals(endpoint.host, advertisedHostName) assertEquals(endpoint.port, advertisedPort.toInt) @@ -182,7 +185,7 @@ class KafkaConfigTest { val serverConfig = KafkaConfig.fromProps(props) val endpoints = serverConfig.advertisedListeners - val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get + val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get assertEquals(endpoint.host, advertisedHostName) assertEquals(endpoint.port, port.toInt) @@ -199,7 +202,7 @@ class KafkaConfigTest { val serverConfig = KafkaConfig.fromProps(props) val endpoints = serverConfig.advertisedListeners - val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get + val endpoint = endpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get assertEquals(endpoint.host, hostName) assertEquals(endpoint.port, advertisedPort.toInt) @@ -213,15 +216,15 @@ class KafkaConfigTest { // listeners with duplicate port props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,TRACE://localhost:9091") - assert(!isValidKafkaConfig(props)) + assertFalse(isValidKafkaConfig(props)) // listeners with duplicate protocol props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092") - assert(!isValidKafkaConfig(props)) + assertFalse(isValidKafkaConfig(props)) // advertised listeners with duplicate port props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9091,TRACE://localhost:9091") - assert(!isValidKafkaConfig(props)) + assertFalse(isValidKafkaConfig(props)) } @Test @@ -231,7 +234,96 @@ class KafkaConfigTest { props.put(KafkaConfig.ZkConnectProp, "localhost:2181") props.put(KafkaConfig.ListenersProp, "BAD://localhost:9091") - assert(!isValidKafkaConfig(props)) + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testListenerNamesWithAdvertisedListenerUnset(): Unit = { + val props = new Properties() + props.put(KafkaConfig.BrokerIdProp, "1") + props.put(KafkaConfig.ZkConnectProp, "localhost:2181") + + props.put(KafkaConfig.ListenersProp, "CLIENT://localhost:9091,REPLICATION://localhost:9092,INTERNAL://localhost:9093") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "CLIENT:SSL,REPLICATION:SSL,INTERNAL:PLAINTEXT") + props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION") + val config = KafkaConfig.fromProps(props) + val expectedListeners = Seq( + EndPoint("localhost", 9091, new ListenerName("CLIENT"), SecurityProtocol.SSL), + EndPoint("localhost", 9092, new ListenerName("REPLICATION"), SecurityProtocol.SSL), + EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT)) + assertEquals(expectedListeners, config.listeners) + assertEquals(expectedListeners, config.advertisedListeners) + val expectedSecurityProtocolMap = Map( + new ListenerName("CLIENT") -> SecurityProtocol.SSL, + new ListenerName("REPLICATION") -> SecurityProtocol.SSL, + new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT + ) + assertEquals(expectedSecurityProtocolMap, config.listenerSecurityProtocolMap) + } + + @Test + def testListenerAndAdvertisedListenerNames(): Unit = { + val props = new Properties() + props.put(KafkaConfig.BrokerIdProp, "1") + props.put(KafkaConfig.ZkConnectProp, "localhost:2181") + + props.put(KafkaConfig.ListenersProp, "EXTERNAL://localhost:9091,INTERNAL://localhost:9093") + props.put(KafkaConfig.AdvertisedListenersProp, "EXTERNAL://lb1.example.com:9000,INTERNAL://host1:9093") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "EXTERNAL:SSL,INTERNAL:PLAINTEXT") + props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL") + val config = KafkaConfig.fromProps(props) + + val expectedListeners = Seq( + EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), + EndPoint("localhost", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) + ) + assertEquals(expectedListeners, config.listeners) + + val expectedAdvertisedListeners = Seq( + EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), SecurityProtocol.SSL), + EndPoint("host1", 9093, new ListenerName("INTERNAL"), SecurityProtocol.PLAINTEXT) + ) + assertEquals(expectedAdvertisedListeners, config.advertisedListeners) + + val expectedSecurityProtocolMap = Map( + new ListenerName("EXTERNAL") -> SecurityProtocol.SSL, + new ListenerName("INTERNAL") -> SecurityProtocol.PLAINTEXT + ) + assertEquals(expectedSecurityProtocolMap, config.listenerSecurityProtocolMap) + } + + @Test + def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { + val props = new Properties() + props.put(KafkaConfig.BrokerIdProp, "1") + props.put(KafkaConfig.ZkConnectProp, "localhost:2181") + + props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091,REPLICATION://localhost:9092") + props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL") + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { + val props = new Properties() + props.put(KafkaConfig.BrokerIdProp, "1") + props.put(KafkaConfig.ZkConnectProp, "localhost:2181") + + props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091") + props.put(KafkaConfig.InterBrokerListenerNameProp, "REPLICATION") + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = { + val props = new Properties() + props.put(KafkaConfig.BrokerIdProp, "1") + props.put(KafkaConfig.ZkConnectProp, "localhost:2181") + + props.put(KafkaConfig.ListenersProp, "SSL://localhost:9091") + props.put(KafkaConfig.InterBrokerListenerNameProp, "SSL") + props.put(KafkaConfig.InterBrokerSecurityProtocolProp, "SSL") + assertFalse(isValidKafkaConfig(props)) } @Test @@ -240,10 +332,15 @@ class KafkaConfigTest { props.put(KafkaConfig.BrokerIdProp, "1") props.put(KafkaConfig.ZkConnectProp, "localhost:2181") props.put(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092") - - assert(isValidKafkaConfig(props)) + val config = KafkaConfig.fromProps(props) + assertEquals(Some("SSL://localhost:9092"), config.listeners.find(_.listenerName.value == "SSL").map(_.connectionString)) + assertEquals(Some("PLAINTEXT://localhost:9091"), config.listeners.find(_.listenerName.value == "PLAINTEXT").map(_.connectionString)) } + def listenerListToEndPoints(listenerList: String, + securityProtocolMap: collection.Map[ListenerName, SecurityProtocol] = EndPoint.DefaultSecurityProtocolMap) = + CoreUtils.listenerListToEndPoints(listenerList, securityProtocolMap) + @Test def testListenerDefaults() { val props = new Properties() @@ -255,22 +352,22 @@ class KafkaConfigTest { props.put(KafkaConfig.PortProp, "1111") val conf = KafkaConfig.fromProps(props) - assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners) + assertEquals(listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners) // configuration with null host props.remove(KafkaConfig.HostNameProp) val conf2 = KafkaConfig.fromProps(props) - assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners) - assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.advertisedListeners) - assertEquals(null, conf2.listeners(SecurityProtocol.PLAINTEXT).host) + assertEquals(listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners) + assertEquals(listenerListToEndPoints("PLAINTEXT://:1111"), conf2.advertisedListeners) + assertEquals(null, conf2.listeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get.host) // configuration with advertised host and port, and no advertised listeners props.put(KafkaConfig.AdvertisedHostNameProp, "otherhost") props.put(KafkaConfig.AdvertisedPortProp, "2222") val conf3 = KafkaConfig.fromProps(props) - assertEquals(conf3.advertisedListeners, CoreUtils.listenerListToEndPoints("PLAINTEXT://otherhost:2222")) + assertEquals(conf3.advertisedListeners, listenerListToEndPoints("PLAINTEXT://otherhost:2222")) } @Test @@ -295,7 +392,7 @@ class KafkaConfigTest { assertEquals(KAFKA_0_8_2, conf3.interBrokerProtocolVersion) //check that latest is newer than 0.8.2 - assert(ApiVersion.latestVersion >= conf3.interBrokerProtocolVersion) + assertTrue(ApiVersion.latestVersion >= conf3.interBrokerProtocolVersion) } private def isValidKafkaConfig(props: Properties): Boolean = { @@ -303,7 +400,7 @@ class KafkaConfigTest { KafkaConfig.fromProps(props) true } catch { - case _: IllegalArgumentException => false + case _: IllegalArgumentException | _: ConfigException => false } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 7e85c19..0ceb71b 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -29,6 +29,7 @@ import kafka.controller.{ControllerChannelManager, ControllerContext} import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.utils.Time import org.junit.{After, Before, Test} @@ -128,8 +129,11 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val controllerId = 2 val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, zkConnect)) - val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())) - val nodes = brokers.map(_.getNode(SecurityProtocol.PLAINTEXT)) + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", TestUtils.boundPort(s), listenerName, + securityProtocol)) + val nodes = brokers.map(_.getNode(listenerName)) val controllerContext = new ControllerContext(zkUtils) controllerContext.liveBrokers = brokers.toSet http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 135e04b..f056476 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -54,7 +54,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { logDir = new File(logDirPath) time = new MockTime() server = TestUtils.createServer(KafkaConfig.fromProps(config), time) - simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "") + simpleConsumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 1000000, 64*1024, "") } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 43ff785..99a95ad 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -21,6 +21,7 @@ import util.Arrays.asList import kafka.common.BrokerEndPointNotAvailableException import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest} import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint} @@ -37,7 +38,7 @@ class MetadataCacheTest { def getTopicMetadataNonExistingTopics() { val topic = "topic" val cache = new MetadataCache(1) - val topicMetadata = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + val topicMetadata = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) assertTrue(topicMetadata.isEmpty) } @@ -51,16 +52,16 @@ class MetadataCacheTest { val controllerId = 2 val controllerEpoch = 1 - def securityProtocolToEndPoint(brokerId: Int): Map[SecurityProtocol, EndPoint] = { + def endPoints(brokerId: Int): Seq[EndPoint] = { val host = s"foo-$brokerId" - Map( - SecurityProtocol.PLAINTEXT -> new EndPoint(host, 9092), - SecurityProtocol.SSL -> new EndPoint(host, 9093) + Seq( + new EndPoint(host, 9092, SecurityProtocol.PLAINTEXT, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)), + new EndPoint(host, 9093, SecurityProtocol.SSL, ListenerName.forSecurityProtocol(SecurityProtocol.SSL)) ) } val brokers = (0 to 2).map { brokerId => - new Broker(brokerId, securityProtocolToEndPoint(brokerId).asJava, "rack1") + new Broker(brokerId, endPoints(brokerId).asJava, "rack1") }.toSet val partitionStates = Map( @@ -73,7 +74,8 @@ class MetadataCacheTest { cache.updateCache(15, updateMetadataRequest) for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) { - val topicMetadatas = cache.getTopicMetadata(Set(topic), securityProtocol) + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName) assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head @@ -89,7 +91,7 @@ class MetadataCacheTest { assertEquals(i, partitionMetadata.partition) val leader = partitionMetadata.leader assertEquals(i, leader.id) - val endPoint = securityProtocolToEndPoint(partitionMetadata.leader.id)(securityProtocol) + val endPoint = endPoints(partitionMetadata.leader.id).find(_.listenerName == listenerName).get assertEquals(endPoint.host, leader.host) assertEquals(endPoint.port, leader.port) assertEquals(List(i), partitionMetadata.isr.asScala.map(_.id)) @@ -109,7 +111,9 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, null)) + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null)) val leader = 1 val leaderEpoch = 1 @@ -120,7 +124,7 @@ class MetadataCacheTest { controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) - val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT) + val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName) assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head @@ -146,7 +150,9 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, null)) + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, null)) // replica 1 is not available val leader = 0 @@ -162,7 +168,7 @@ class MetadataCacheTest { cache.updateCache(15, updateMetadataRequest) // Validate errorUnavailableEndpoints = false - val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false) + val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = false) assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head @@ -178,7 +184,7 @@ class MetadataCacheTest { assertEquals(Set(0), partitionMetadata.isr.asScala.map(_.id).toSet) // Validate errorUnavailableEndpoints = true - val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true) + val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true) assertEquals(1, topicMetadatasWithError.size) val topicMetadataWithError = topicMetadatasWithError.head @@ -203,7 +209,9 @@ class MetadataCacheTest { val zkVersion = 3 val controllerId = 2 val controllerEpoch = 1 - val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, "rack1")) + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val brokers = Set(new Broker(0, Seq(new EndPoint("foo", 9092, securityProtocol, listenerName)).asJava, "rack1")) // replica 1 is not available val leader = 0 @@ -219,7 +227,7 @@ class MetadataCacheTest { cache.updateCache(15, updateMetadataRequest) // Validate errorUnavailableEndpoints = false - val topicMetadatas = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = false) + val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = false) assertEquals(1, topicMetadatas.size) val topicMetadata = topicMetadatas.head @@ -235,7 +243,7 @@ class MetadataCacheTest { assertEquals(Set(0, 1), partitionMetadata.isr.asScala.map(_.id).toSet) // Validate errorUnavailableEndpoints = true - val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), SecurityProtocol.PLAINTEXT, errorUnavailableEndpoints = true) + val topicMetadatasWithError = cache.getTopicMetadata(Set(topic), listenerName, errorUnavailableEndpoints = true) assertEquals(1, topicMetadatasWithError.size) val topicMetadataWithError = topicMetadatasWithError.head @@ -255,7 +263,9 @@ class MetadataCacheTest { def getTopicMetadataWithNonSupportedSecurityProtocol() { val topic = "topic" val cache = new MetadataCache(1) - val brokers = Set(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, "")) + val securityProtocol = SecurityProtocol.PLAINTEXT + val brokers = Set(new Broker(0, + Seq(new EndPoint("foo", 9092, securityProtocol, ListenerName.forSecurityProtocol(securityProtocol))).asJava, "")) val controllerEpoch = 1 val leader = 0 val leaderEpoch = 0 @@ -268,7 +278,7 @@ class MetadataCacheTest { cache.updateCache(15, updateMetadataRequest) try { - val result = cache.getTopicMetadata(Set(topic), SecurityProtocol.SSL) + val result = cache.getTopicMetadata(Set(topic), ListenerName.forSecurityProtocol(SecurityProtocol.SSL)) fail(s"Exception should be thrown by `getTopicMetadata` with non-supported SecurityProtocol, $result was returned instead") } catch { @@ -284,7 +294,9 @@ class MetadataCacheTest { def updateCache(brokerIds: Set[Int]) { val brokers = brokerIds.map { brokerId => - new Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new EndPoint("foo", 9092)).asJava, "") + val securityProtocol = SecurityProtocol.PLAINTEXT + new Broker(brokerId, Seq( + new EndPoint("foo", 9092, securityProtocol, ListenerName.forSecurityProtocol(securityProtocol))).asJava, "") } val controllerEpoch = 1 val leader = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala index e46c41f..aef29bc 100755 --- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala +++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala @@ -52,7 +52,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness { val logDirPath = config.getProperty("log.dir") logDir = new File(logDirPath) server = TestUtils.createServer(KafkaConfig.fromProps(config), Time.SYSTEM) - simpleConsumer = new SimpleConsumer("localhost", server.boundPort(), 1000000, 64*1024, "test-client") + simpleConsumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 1000000, 64*1024, "test-client") val consumerMetadataRequest = GroupCoordinatorRequest(group) Stream.continually { val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index bcf0a9c..00959f1 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -21,8 +21,8 @@ import java.io.File import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.Broker -import kafka.common.TopicAndPartition import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils} +import TestUtils.createBroker import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors @@ -138,7 +138,7 @@ class ReplicaManagerTest { fetchCallbackFired = true } - val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1)) + val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1)) val metadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() EasyMock.replay(metadataCache) @@ -196,7 +196,7 @@ class ReplicaManagerTest { val rm = new ReplicaManager(config, metrics, time, zkUtils, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time).follower, Option(this.getClass.getName)) try { - val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1), new Broker(1, "host2", 2)) + val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1), createBroker(1, "host2", 2)) val metadataCache = EasyMock.createMock(classOf[MetadataCache]) EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() EasyMock.replay(metadataCache) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index fd0a460..66845c1 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -81,7 +81,7 @@ class ServerShutdownTest extends ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0) producer = createProducer(server) - val consumer = new SimpleConsumer(host, server.boundPort(), 1000000, 64*1024, "") + val consumer = new SimpleConsumer(host, TestUtils.boundPort(server), 1000000, 64*1024, "") var fetchedMessage: ByteBufferMessageSet = null while (fetchedMessage == null || fetchedMessage.validBytes == 0) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 92c6a9b..ac757d0 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -47,7 +47,7 @@ class ServerStartupTest extends ZooKeeperTestHarness { val brokerId1 = 0 val props1 = TestUtils.createBrokerConfig(brokerId1, zkConnect) val server1 = TestUtils.createServer(KafkaConfig.fromProps(props1)) - val port = server1.boundPort() + val port = TestUtils.boundPort(server1) // Create a second broker with same port val brokerId2 = 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala index 5f466c5..fda17c0 100644 --- a/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala +++ b/core/src/test/scala/unit/kafka/server/SessionExpireListenerTest.scala @@ -54,7 +54,7 @@ class SessionExpireListenerTest { val zkClient = EasyMock.mock(classOf[ZkClient]) val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) import Watcher._ - val healthcheck = new KafkaHealthcheck(brokerId, Map.empty, zkUtils, None, ApiVersion.latestVersion) + val healthcheck = new KafkaHealthcheck(brokerId, Seq.empty, zkUtils, None, ApiVersion.latestVersion) val expiresPerSecName = "ZooKeeperExpiresPerSec" val disconnectsPerSecName = "ZooKeeperDisconnectsPerSec" http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 24ec1c2..c530e07 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -48,7 +48,7 @@ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produce import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.network.Mode +import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.record._ import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer} import org.apache.kafka.common.utils.Time @@ -125,6 +125,12 @@ object TestUtils extends Logging { server } + def boundPort(server: KafkaServer, securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int = + server.boundPort(ListenerName.forSecurityProtocol(securityProtocol)) + + def createBroker(id: Int, host: String, port: Int, securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Broker = + new Broker(id, host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) + /** * Create a test config for the provided parameters. * @@ -150,7 +156,19 @@ object TestUtils extends Logging { } def getBrokerListStrFromServers(servers: Seq[KafkaServer], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = { - servers.map(s => formatAddress(s.config.hostName, s.boundPort(protocol))).mkString(",") + servers.map { s => + val listener = s.config.advertisedListeners.find(_.securityProtocol == protocol).getOrElse( + sys.error(s"Could not find listener with security protocol $protocol")) + formatAddress(listener.host, boundPort(s, protocol)) + }.mkString(",") + } + + def bootstrapServers(servers: Seq[KafkaServer], listenerName: ListenerName): String = { + servers.map { s => + val listener = s.config.advertisedListeners.find(_.listenerName == listenerName).getOrElse( + sys.error(s"Could not find listener with name $listenerName")) + formatAddress(listener.host, s.boundPort(listenerName)) + }.mkString(",") } /** @@ -596,7 +614,8 @@ object TestUtils extends Logging { def createBrokersInZk(brokerMetadatas: Seq[kafka.admin.BrokerMetadata], zkUtils: ZkUtils): Seq[Broker] = { val brokers = brokerMetadatas.map { b => val protocol = SecurityProtocol.PLAINTEXT - Broker(b.id, Map(protocol -> EndPoint("localhost", 6667, protocol)).toMap, b.rack) + val listenerName = ListenerName.forSecurityProtocol(protocol) + Broker(b.id, Seq(EndPoint("localhost", 6667, listenerName, protocol)), b.rack) } brokers.foreach(b => zkUtils.registerBrokerInZk(b.id, "localhost", 6667, b.endPoints, jmxPort = -1, rack = b.rack, ApiVersion.latestVersion)) @@ -604,7 +623,7 @@ object TestUtils extends Logging { } def deleteBrokersInZk(zkUtils: ZkUtils, ids: Seq[Int]): Seq[Broker] = { - val brokers = ids.map(id => new Broker(id, "localhost", 6667, SecurityProtocol.PLAINTEXT)) + val brokers = ids.map(createBroker(_, "localhost", 6667, SecurityProtocol.PLAINTEXT)) brokers.foreach(b => zkUtils.deletePath(ZkUtils.BrokerIdsPath + "/" + b)) brokers } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java index ac9b670..70c5063 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -29,6 +29,7 @@ import kafka.utils.ZKStringSerializer$; import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.SecurityProtocol; import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; @@ -110,7 +111,7 @@ public class KafkaEmbedded { * You can use this to tell Kafka producers and consumers how to connect to this instance. */ public String brokerList() { - return kafka.config().hostName() + ":" + kafka.boundPort(SecurityProtocol.PLAINTEXT); + return kafka.config().hostName() + ":" + kafka.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)); }