http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 73fee6c..7f0befe 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -27,9 +27,9 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, Message import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.ConfigDef.ValidList -import org.apache.kafka.common.config.SaslConfigs -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, SslConfigs} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs} import org.apache.kafka.common.metrics.{MetricsReporter, Sensor} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.record.TimestampType @@ -58,6 +58,11 @@ object Defaults { /** ********* Socket Server Configuration ***********/ val Port = 9092 val HostName: String = new String("") + + val ListenerSecurityProtocolMap: String = EndPoint.DefaultSecurityProtocolMap.map { case (listenerName, securityProtocol) => + s"${listenerName.value}:${securityProtocol.name}" + }.mkString(",") + val SocketSendBufferBytes: Int = 100 * 1024 val SocketReceiveBufferBytes: Int = 100 * 1024 val SocketRequestMaxBytes: Int = 100 * 1024 * 1024 @@ -223,6 +228,7 @@ object KafkaConfig { val AdvertisedHostNameProp: String = "advertised.host.name" val AdvertisedPortProp = "advertised.port" val AdvertisedListenersProp = "advertised.listeners" + val ListenerSecurityProtocolMapProp = "listener.security.protocol.map" val SocketSendBufferBytesProp = "socket.send.buffer.bytes" val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" val SocketRequestMaxBytesProp = "socket.request.max.bytes" @@ -295,6 +301,7 @@ object KafkaConfig { val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol" val InterBrokerProtocolVersionProp = "inter.broker.protocol.version" + val InterBrokerListenerNameProp = "inter.broker.listener.name" /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" @@ -387,12 +394,13 @@ object KafkaConfig { val HostNameDoc = "DEPRECATED: only used when `listeners` is not set. " + "Use `listeners` instead. \n" + "hostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfaces" - val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and their protocols.\n" + + val ListenersDoc = "Listener List - Comma-separated list of URIs we will listen on and the listener names." + + s" If the listener name is not a security protocol, $ListenerSecurityProtocolMapProp must also be set.\n" + " Specify hostname as 0.0.0.0 to bind to all interfaces.\n" + " Leave hostname empty to bind to default interface.\n" + " Examples of legal listener lists:\n" + - " PLAINTEXT://myhost:9092,TRACE://:9091\n" + - " PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093\n" + " PLAINTEXT://myhost:9092,SSL://:9091\n" + + " CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093\n" val AdvertisedHostNameDoc = "DEPRECATED: only used when `advertised.listeners` or `listeners` are not set. " + "Use `advertised.listeners` instead. \n" + "Hostname to publish to ZooKeeper for clients to use. In IaaS environments, this may " + @@ -407,6 +415,12 @@ object KafkaConfig { val AdvertisedListenersDoc = "Listeners to publish to ZooKeeper for clients to use, if different than the listeners above." + " In IaaS environments, this may need to be different from the interface to which the broker binds." + " If this is not set, the value for `listeners` will be used." + val ListenerSecurityProtocolMapDoc = "Map between listener names and security protocols. This must be defined for " + + "the same security protocol to be usable in more than one port or IP. For example, we can separate internal and " + + "external traffic even if SSL is required for both. Concretely, we could define listeners with names INTERNAL " + + "and EXTERNAL and this property as: `INTERNAL:SSL,EXTERNAL:SSL`. As shown, key and value are separated by a colon " + + "and map entries are separated by commas. Each listener name should only appear once in the map." + val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used." val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used." val SocketRequestMaxBytesDoc = "The maximum number of bytes in a socket request" @@ -504,10 +518,13 @@ object KafkaConfig { val LeaderImbalanceCheckIntervalSecondsDoc = "The frequency with which the partition rebalance check is triggered by the controller" val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss" val InterBrokerSecurityProtocolDoc = "Security protocol used to communicate between brokers. Valid values are: " + - s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", ")}." + s"${SecurityProtocol.nonTestingValues.asScala.toSeq.map(_.name).mkString(", ")}. It is an error to set this and " + + s"$InterBrokerListenerNameProp properties at the same time." val InterBrokerProtocolVersionDoc = "Specify which version of the inter-broker protocol will be used.\n" + " This is typically bumped after all brokers were upgraded to a new version.\n" + " Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list." + val InterBrokerListenerNameDoc = s"Name of listener used for communication between brokers. If this is unset, the listener name is defined by $InterBrokerSecurityProtocolProp. " + + s"It is an error to set this and $InterBrokerSecurityProtocolProp properties at the same time." /** ********* Controlled shutdown configuration ***********/ val ControlledShutdownMaxRetriesDoc = "Controlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happens" val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." @@ -617,6 +634,7 @@ object KafkaConfig { .define(AdvertisedHostNameProp, STRING, null, HIGH, AdvertisedHostNameDoc) .define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc) .define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc) + .define(ListenerSecurityProtocolMapProp, STRING, Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc) .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc) .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc) .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc) @@ -692,6 +710,7 @@ object KafkaConfig { .define(UncleanLeaderElectionEnableProp, BOOLEAN, Defaults.UncleanLeaderElectionEnable, HIGH, UncleanLeaderElectionEnableDoc) .define(InterBrokerSecurityProtocolProp, STRING, Defaults.InterBrokerSecurityProtocol, MEDIUM, InterBrokerSecurityProtocolDoc) .define(InterBrokerProtocolVersionProp, STRING, Defaults.InterBrokerProtocolVersion, MEDIUM, InterBrokerProtocolVersionDoc) + .define(InterBrokerListenerNameProp, STRING, null, MEDIUM, InterBrokerListenerNameDoc) /** ********* Controlled shutdown configuration ***********/ .define(ControlledShutdownMaxRetriesProp, INT, Defaults.ControlledShutdownMaxRetries, MEDIUM, ControlledShutdownMaxRetriesDoc) @@ -885,7 +904,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val leaderImbalancePerBrokerPercentage = getInt(KafkaConfig.LeaderImbalancePerBrokerPercentageProp) val leaderImbalanceCheckIntervalSeconds = getLong(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp) val uncleanLeaderElectionEnable: java.lang.Boolean = getBoolean(KafkaConfig.UncleanLeaderElectionEnableProp) - val interBrokerSecurityProtocol = SecurityProtocol.forName(getString(KafkaConfig.InterBrokerSecurityProtocolProp)) + + val (interBrokerListenerName, interBrokerSecurityProtocol) = getInterBrokerListenerNameAndSecurityProtocol + // We keep the user-provided String as `ApiVersion.apply` can choose a slightly different version (eg if `0.10.0` // is passed, `0.10.0-IV0` may be picked) val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp) @@ -953,9 +974,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp) val compressionType = getString(KafkaConfig.CompressionTypeProp) - - val listeners = getListeners - val advertisedListeners = getAdvertisedListeners + val listeners: Seq[EndPoint] = getListeners + val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners + private[kafka] lazy val listenerSecurityProtocolMap = getListenerSecurityProtocolMap private def getLogRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L @@ -980,45 +1001,57 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra } } - private def validateUniquePortAndProtocol(listeners: String) { - - val endpoints = try { - val listenerList = CoreUtils.parseCsvList(listeners) - listenerList.map(listener => EndPoint.createEndPoint(listener)) - } catch { - case e: Exception => throw new IllegalArgumentException("Error creating broker listeners from '%s': %s".format(listeners, e.getMessage)) - } - // filter port 0 for unit tests - val endpointsWithoutZeroPort = endpoints.map(ep => ep.port).filter(_ != 0) - val distinctPorts = endpointsWithoutZeroPort.distinct - val distinctProtocols = endpoints.map(ep => ep.protocolType).distinct - - require(distinctPorts.size == endpointsWithoutZeroPort.size, "Each listener must have a different port") - require(distinctProtocols.size == endpoints.size, "Each listener must have a different protocol") - } - // If the user did not define listeners but did define host or port, let's use them in backward compatible way // If none of those are defined, we default to PLAINTEXT://:9092 - private def getListeners(): immutable.Map[SecurityProtocol, EndPoint] = { - if (getString(KafkaConfig.ListenersProp) != null) { - validateUniquePortAndProtocol(getString(KafkaConfig.ListenersProp)) - CoreUtils.listenerListToEndPoints(getString(KafkaConfig.ListenersProp)) - } else { - CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port) - } + private def getListeners: Seq[EndPoint] = { + Option(getString(KafkaConfig.ListenersProp)).map { listenerProp => + CoreUtils.listenerListToEndPoints(listenerProp, listenerSecurityProtocolMap) + }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap)) } // If the user defined advertised listeners, we use those // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults // If none of these are defined, we'll use the listeners - private def getAdvertisedListeners(): immutable.Map[SecurityProtocol, EndPoint] = { - if (getString(KafkaConfig.AdvertisedListenersProp) != null) { - validateUniquePortAndProtocol(getString(KafkaConfig.AdvertisedListenersProp)) - CoreUtils.listenerListToEndPoints(getString(KafkaConfig.AdvertisedListenersProp)) - } else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) { - CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort) - } else { - getListeners() + private def getAdvertisedListeners: Seq[EndPoint] = { + val advertisedListenersProp = getString(KafkaConfig.AdvertisedListenersProp) + if (advertisedListenersProp != null) + CoreUtils.listenerListToEndPoints(advertisedListenersProp, listenerSecurityProtocolMap) + else if (getString(KafkaConfig.AdvertisedHostNameProp) != null || getInt(KafkaConfig.AdvertisedPortProp) != null) + CoreUtils.listenerListToEndPoints("PLAINTEXT://" + advertisedHostName + ":" + advertisedPort, listenerSecurityProtocolMap) + else + getListeners + } + + private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, SecurityProtocol) = { + Option(getString(KafkaConfig.InterBrokerListenerNameProp)) match { + case Some(_) if originals.containsKey(KafkaConfig.InterBrokerSecurityProtocolProp) => + throw new ConfigException(s"Only one of ${KafkaConfig.InterBrokerListenerNameProp} and " + + s"${KafkaConfig.InterBrokerSecurityProtocolProp} should be set.") + case Some(name) => + val listenerName = ListenerName.normalised(name) + val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName, + throw new ConfigException(s"Listener with name ${listenerName.value} defined in " + + s"${KafkaConfig.InterBrokerListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}.")) + (listenerName, securityProtocol) + case None => + val securityProtocol = getSecurityProtocol(getString(KafkaConfig.InterBrokerSecurityProtocolProp), + KafkaConfig.InterBrokerSecurityProtocolProp) + (ListenerName.forSecurityProtocol(securityProtocol), securityProtocol) + } + } + + private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = { + try SecurityProtocol.forName(protocolName) + catch { + case e: IllegalArgumentException => + throw new ConfigException(s"Invalid security protocol `$protocolName` defined in $configName") + } + } + + private def getListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { + getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp)) + .map { case (listenerName, protocolName) => + ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, KafkaConfig.ListenerSecurityProtocolMapProp) } } @@ -1043,12 +1076,16 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra "offsets.commit.required.acks must be greater or equal -1 and less or equal to offsets.topic.replication.factor") require(BrokerCompressionCodec.isValid(compressionType), "compression.type : " + compressionType + " is not valid." + " Valid options are " + BrokerCompressionCodec.brokerCompressionOptions.mkString(",")) - require(advertisedListeners.keySet.contains(interBrokerSecurityProtocol), - s"${KafkaConfig.InterBrokerSecurityProtocolProp} must be a protocol in the configured set of ${KafkaConfig.AdvertisedListenersProp}. " + - s"The valid options based on currently configured protocols are ${advertisedListeners.keySet}") - require(advertisedListeners.keySet.subsetOf(listeners.keySet), - s"${KafkaConfig.AdvertisedListenersProp} protocols must be equal to or a subset of ${KafkaConfig.ListenersProp} protocols. " + - s"Found ${advertisedListeners.keySet}. The valid options based on currently configured protocols are ${listeners.keySet}" + + val advertisedListenerNames = advertisedListeners.map(_.listenerName).toSet + val listenerNames = listeners.map(_.listenerName).toSet + require(advertisedListenerNames.contains(interBrokerListenerName), + s"${KafkaConfig.InterBrokerListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " + + s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") + require(advertisedListenerNames.subsetOf(listenerNames), + s"${KafkaConfig.AdvertisedListenersProp} listener names must be equal to or a subset of the ones defined in ${KafkaConfig.ListenersProp}. " + + s"Found ${advertisedListenerNames.map(_.value).mkString(",")}. The valid options based on the current configuration " + + s"are ${listenerNames.map(_.value).mkString(",")}" ) require(interBrokerProtocolVersion >= logMessageFormatVersion, s"log.message.format.version $logMessageFormatVersionString cannot be used when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/KafkaHealthcheck.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala index 4133145..c7b398f 100644 --- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala +++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala @@ -26,6 +26,7 @@ import kafka.cluster.EndPoint import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import org.I0Itec.zkclient.IZkStateListener +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.zookeeper.Watcher.Event.KeeperState @@ -38,7 +39,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState * we are dead. */ class KafkaHealthcheck(brokerId: Int, - advertisedEndpoints: Map[SecurityProtocol, EndPoint], + advertisedEndpoints: Seq[EndPoint], zkUtils: ZkUtils, rack: Option[String], interBrokerProtocolVersion: ApiVersion) extends Logging { @@ -55,17 +56,18 @@ class KafkaHealthcheck(brokerId: Int, */ def register() { val jmxPort = System.getProperty("com.sun.management.jmxremote.port", "-1").toInt - val updatedEndpoints = advertisedEndpoints.mapValues(endpoint => + val updatedEndpoints = advertisedEndpoints.map(endpoint => if (endpoint.host == null || endpoint.host.trim.isEmpty) - EndPoint(InetAddress.getLocalHost.getCanonicalHostName, endpoint.port, endpoint.protocolType) + endpoint.copy(host = InetAddress.getLocalHost.getCanonicalHostName) else endpoint ) - // the default host and port are here for compatibility with older client - // only PLAINTEXT is supported as default - // if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect - val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null)) + // the default host and port are here for compatibility with older clients that only support PLAINTEXT + // we choose the first plaintext port, if there is one + // or we register an empty endpoint, which means that older clients will not be able to connect + val plaintextEndpoint = updatedEndpoints.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).getOrElse( + new EndPoint(null, -1, null, null)) zkUtils.registerBrokerInZk(brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort, rack, interBrokerProtocolVersion) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 7cf3940..79548e8 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -253,11 +253,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP dynamicConfigManager.startup() /* tell everyone we are alive */ - val listeners = config.advertisedListeners.map {case(protocol, endpoint) => + val listeners = config.advertisedListeners.map { endpoint => if (endpoint.port == 0) - (protocol, EndPoint(endpoint.host, socketServer.boundPort(protocol), endpoint.protocolType)) + endpoint.copy(port = socketServer.boundPort(endpoint.listenerName)) else - (protocol, endpoint) + endpoint } kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkUtils, config.rack, config.interBrokerProtocolVersion) @@ -345,7 +345,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP private def controlledShutdown() { def node(broker: Broker): Node = { - val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) + val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName) new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port) } @@ -482,8 +482,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP if (channel != null) channel.disconnect() - channel = new BlockingChannel(broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).host, - broker.getBrokerEndPoint(config.interBrokerSecurityProtocol).port, + val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName) + channel = new BlockingChannel(brokerEndPoint.host, + brokerEndPoint.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.controllerSocketTimeoutMs) @@ -619,7 +620,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP def getLogManager(): LogManager = logManager - def boundPort(protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Int = socketServer.boundPort(protocol) + def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName) private def createLogManager(zkClient: ZkClient, brokerState: BrokerState): LogManager = { val defaultProps = KafkaServer.copyKafkaConfigToLog(config) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index feef6ae..5c28e14 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -17,7 +17,6 @@ package kafka.server -import java.util.EnumMap import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.{Seq, Set, mutable} @@ -29,7 +28,8 @@ import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch} import kafka.utils.CoreUtils._ import kafka.utils.Logging import org.apache.kafka.common.Node -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, PartitionState, UpdateMetadataRequest} /** @@ -41,7 +41,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]() private var controllerId: Option[Int] = None private val aliveBrokers = mutable.Map[Int, Broker]() - private val aliveNodes = mutable.Map[Int, collection.Map[SecurityProtocol, Node]]() + private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]() private val partitionMetadataLock = new ReentrantReadWriteLock() this.logIdent = s"[Kafka Metadata Cache on broker $brokerId] " @@ -49,10 +49,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { // This method is the main hotspot when it comes to the performance of metadata requests, // we should be careful about adding additional logic here. // filterUnavailableEndpoints exists to support v0 MetadataResponses - private def getEndpoints(brokers: Iterable[Int], protocol: SecurityProtocol, filterUnavailableEndpoints: Boolean): Seq[Node] = { + private def getEndpoints(brokers: Iterable[Int], listenerName: ListenerName, filterUnavailableEndpoints: Boolean): Seq[Node] = { val result = new mutable.ArrayBuffer[Node](math.min(aliveBrokers.size, brokers.size)) brokers.foreach { brokerId => - val endpoint = getAliveEndpoint(brokerId, protocol) match { + val endpoint = getAliveEndpoint(brokerId, listenerName) match { case None => if (!filterUnavailableEndpoints) Some(new Node(brokerId, "", -1)) else None case Some(node) => Some(node) } @@ -61,23 +61,23 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { result } - private def getAliveEndpoint(brokerId: Int, protocol: SecurityProtocol): Option[Node] = + private def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] = aliveNodes.get(brokerId).map { nodeMap => - nodeMap.getOrElse(protocol, - throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not support security protocol `$protocol`")) + nodeMap.getOrElse(listenerName, + throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`")) } // errorUnavailableEndpoints exists to support v0 MetadataResponses - private def getPartitionMetadata(topic: String, protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { + private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { cache.get(topic).map { partitions => partitions.map { case (partitionId, partitionState) => val topicPartition = TopicAndPartition(topic, partitionId) val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr - val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, protocol) + val maybeLeader = getAliveEndpoint(leaderAndIsr.leader, listenerName) val replicas = partitionState.allReplicas - val replicaInfo = getEndpoints(replicas, protocol, errorUnavailableEndpoints) + val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints) maybeLeader match { case None => @@ -87,7 +87,7 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { case Some(leader) => val isr = leaderAndIsr.isr - val isrInfo = getEndpoints(isr, protocol, errorUnavailableEndpoints) + val isrInfo = getEndpoints(isr, listenerName, errorUnavailableEndpoints) if (replicaInfo.size < replicas.size) { debug(s"Error while fetching metadata for $topicPartition: replica information not available for " + @@ -110,10 +110,10 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } // errorUnavailableEndpoints exists to support v0 MetadataResponses - def getTopicMetadata(topics: Set[String], protocol: SecurityProtocol, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = { + def getTopicMetadata(topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = { inReadLock(partitionMetadataLock) { topics.toSeq.flatMap { topic => - getPartitionMetadata(topic, protocol, errorUnavailableEndpoints).map { partitionMetadata => + getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata => new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava) } } @@ -164,13 +164,16 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { aliveNodes.clear() aliveBrokers.clear() updateMetadataRequest.liveBrokers.asScala.foreach { broker => - val nodes = new EnumMap[SecurityProtocol, Node](classOf[SecurityProtocol]) - val endPoints = new EnumMap[SecurityProtocol, EndPoint](classOf[SecurityProtocol]) - broker.endPoints.asScala.foreach { case (protocol, ep) => - endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol)) - nodes.put(protocol, new Node(broker.id, ep.host, ep.port)) + // `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which + // is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could + // move to `AnyRefMap`, which has comparable performance. + val nodes = new java.util.HashMap[ListenerName, Node] + val endPoints = new mutable.ArrayBuffer[EndPoint] + broker.endPoints.asScala.foreach { ep => + endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol) + nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port)) } - aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala, Option(broker.rack)) + aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 87b8d90..199bb67 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -877,7 +877,7 @@ class ReplicaManager(val config: KafkaConfig, // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition => partition.topicPartition -> BrokerAndInitialOffset( - metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerSecurityProtocol), + metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName), partition.getReplica().get.logEndOffset.messageOffset)).toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 8f86f66..fa8febc 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -32,6 +32,7 @@ import kafka.client.ClientUtils import kafka.network.BlockingChannel import kafka.api.PartitionOffsetRequestInfo import org.I0Itec.zkclient.exception.ZkNoNodeException +import org.apache.kafka.common.network.ListenerName object ConsumerOffsetChecker extends Logging { @@ -42,7 +43,7 @@ object ConsumerOffsetChecker extends Logging { private def getConsumer(zkUtils: ZkUtils, bid: Int): Option[SimpleConsumer] = { try { zkUtils.getBrokerInfo(bid) - .map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) + .map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))) .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerOffsetChecker")) .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))) } catch { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala index 96a33b1..8c6a8ba 100755 --- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala +++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala @@ -18,10 +18,11 @@ package kafka.tools import org.I0Itec.zkclient.ZkClient -import kafka.consumer.{SimpleConsumer, ConsumerConfig} -import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest} -import kafka.common.{TopicAndPartition, KafkaException} -import kafka.utils.{ZKGroupTopicDirs, ZkUtils, CoreUtils} +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo} +import kafka.common.{KafkaException, TopicAndPartition} +import kafka.utils.{CoreUtils, ZKGroupTopicDirs, ZkUtils} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils @@ -67,9 +68,8 @@ object UpdateOffsetsInZK { zkUtils.getBrokerInfo(broker) match { case Some(brokerInfo) => - val consumer = new SimpleConsumer(brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host, - brokerInfo.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port, - 10000, 100 * 1024, "UpdateOffsetsInZk") + val brokerEndPoint = brokerInfo.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + val consumer = new SimpleConsumer(brokerEndPoint.host, brokerEndPoint.port, 10000, 100 * 1024, "UpdateOffsetsInZk") val topicAndPartition = TopicAndPartition(topic, partition) val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1))) val offset = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/utils/CoreUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala index 7fe9cc9..d427e9c 100755 --- a/core/src/main/scala/kafka/utils/CoreUtils.scala +++ b/core/src/main/scala/kafka/utils/CoreUtils.scala @@ -31,6 +31,7 @@ import org.apache.kafka.common.protocol.SecurityProtocol import scala.collection._ import scala.collection.mutable import kafka.cluster.EndPoint +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.utils.Utils /** @@ -254,9 +255,26 @@ object CoreUtils extends Logging { .keys } - def listenerListToEndPoints(listeners: String): immutable.Map[SecurityProtocol, EndPoint] = { - val listenerList = parseCsvList(listeners) - listenerList.map(listener => EndPoint.createEndPoint(listener)).map(ep => ep.protocolType -> ep).toMap + def listenerListToEndPoints(listeners: String, securityProtocolMap: Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = { + def validate(endPoints: Seq[EndPoint]): Unit = { + // filter port 0 for unit tests + val portsExcludingZero = endPoints.map(_.port).filter(_ != 0) + val distinctPorts = portsExcludingZero.distinct + val distinctListenerNames = endPoints.map(_.listenerName).distinct + + require(distinctPorts.size == portsExcludingZero.size, s"Each listener must have a different port, listeners: $listeners") + require(distinctListenerNames.size == endPoints.size, s"Each listener must have a different name, listeners: $listeners") + } + + val endPoints = try { + val listenerList = parseCsvList(listeners) + listenerList.map(EndPoint.createEndPoint(_, Some(securityProtocolMap))) + } catch { + case e: Exception => + throw new IllegalArgumentException(s"Error creating broker listeners from '$listeners': ${e.getMessage}", e) + } + validate(endPoints) + endPoints } def generateUuidAsBase64(): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index fcb5648..7a6bd63 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -31,6 +31,7 @@ import org.I0Itec.zkclient.exception.{ZkBadVersionException, ZkException, ZkMars import org.I0Itec.zkclient.serialize.ZkSerializer import org.I0Itec.zkclient.{ZkClient, ZkConnection} import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Time import org.apache.zookeeper.AsyncCallback.{DataCallback, StringCallback} @@ -252,10 +253,6 @@ class ZkUtils(val zkClient: ZkClient, brokerIds.map(_.toInt).map(getBrokerInfo(_)).filter(_.isDefined).map(_.get) } - def getAllBrokerEndPointsForChannel(protocolType: SecurityProtocol): Seq[BrokerEndPoint] = { - getAllBrokersInCluster().map(_.getBrokerEndPoint(protocolType)) - } - def getLeaderAndIsrForPartition(topic: String, partition: Int):Option[LeaderAndIsr] = { ReplicationUtils.getLeaderIsrAndEpochForPartition(this, topic, partition).map(_.leaderAndIsr) } @@ -266,15 +263,8 @@ class ZkUtils(val zkClient: ZkClient, } def getLeaderForPartition(topic: String, partition: Int): Option[Int] = { - val leaderAndIsrOpt = readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1 - leaderAndIsrOpt match { - case Some(leaderAndIsr) => - Json.parseFull(leaderAndIsr) match { - case Some(m) => - Some(m.asInstanceOf[Map[String, Any]].get("leader").get.asInstanceOf[Int]) - case None => None - } - case None => None + readDataMaybeNull(getTopicPartitionLeaderAndIsrPath(topic, partition))._1.flatMap { leaderAndIsr => + Json.parseFull(leaderAndIsr).map(_.asInstanceOf[Map[String, Any]]("leader").asInstanceOf[Int]) } } @@ -341,11 +331,11 @@ class ZkUtils(val zkClient: ZkClient, } /** - * Register brokers with v3 json format (which includes multiple endpoints and rack) if + * Register brokers with v4 json format (which includes multiple endpoints and rack) if * the apiVersion is 0.10.0.X or above. Register the broker with v2 json format otherwise. * Due to KAFKA-3100, 0.9.0.0 broker and old clients will break if JSON version is above 2. - * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X without having to upgrade - * to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case). + * We include v2 to make it possible for the broker to migrate from 0.9.0.0 to 0.10.0.X or above without having to + * upgrade to 0.9.0.1 first (clients have to be upgraded to 0.9.0.1 in any case). * * This format also includes default endpoints for compatibility with older clients. * @@ -360,25 +350,15 @@ class ZkUtils(val zkClient: ZkClient, def registerBrokerInZk(id: Int, host: String, port: Int, - advertisedEndpoints: collection.Map[SecurityProtocol, EndPoint], + advertisedEndpoints: Seq[EndPoint], jmxPort: Int, rack: Option[String], apiVersion: ApiVersion) { val brokerIdPath = BrokerIdsPath + "/" + id - val timestamp = Time.SYSTEM.milliseconds.toString - - val version = if (apiVersion >= KAFKA_0_10_0_IV1) 3 else 2 - var jsonMap = Map("version" -> version, - "host" -> host, - "port" -> port, - "endpoints" -> advertisedEndpoints.values.map(_.connectionString).toArray, - "jmx_port" -> jmxPort, - "timestamp" -> timestamp - ) - rack.foreach(rack => if (version >= 3) jsonMap += ("rack" -> rack)) - - val brokerInfo = Json.encode(jsonMap) - registerBrokerInZk(brokerIdPath, brokerInfo) + // see method documentation for reason why we do this + val version = if (apiVersion >= KAFKA_0_10_0_IV1) 4 else 2 + val json = Broker.toJson(version, id, host, port, advertisedEndpoints, jmxPort, rack) + registerBrokerInZk(brokerIdPath, json) info("Registered broker %d at path %s with addresses: %s".format(id, brokerIdPath, advertisedEndpoints.mkString(","))) } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a2fc2d5..9e1efa6 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -40,6 +40,7 @@ import scala.collection.mutable import scala.collection.mutable.Buffer import org.apache.kafka.common.KafkaException import kafka.admin.AdminUtils +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.record.MemoryRecords class AuthorizerIntegrationTest extends BaseRequestTest { @@ -208,15 +209,17 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createUpdateMetadataRequest = { val partitionState = Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava + val securityProtocol = SecurityProtocol.PLAINTEXT val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, - Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava, null)).asJava + Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol, + ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava new requests.UpdateMetadataRequest.Builder(brokerId, Int.MaxValue, partitionState, brokers).build() } private def createJoinGroupRequest = { new JoinGroupRequest.Builder(group, 10000, "", "consumer", List( new JoinGroupRequest.ProtocolMetadata("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava) - .setRebalanceTimeout(60000).build(); + .setRebalanceTimeout(60000).build() } private def createSyncGroupRequest = { http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala index 8d676d1..852377c 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerBounceTest.scala @@ -113,7 +113,7 @@ class ProducerBounceTest extends KafkaServerTestHarness { val newLeaders = (0 until numPartitions).map(i => TestUtils.waitUntilMetadataIsPropagated(servers, topic1, i)) val fetchResponses = newLeaders.zipWithIndex.map { case (leader, partition) => // Consumers must be instantiated after all the restarts since they use random ports each time they start up - val consumer = new SimpleConsumer("localhost", servers(leader).boundPort(), 100, 1024 * 1024, "") + val consumer = new SimpleConsumer("localhost", boundPort(servers(leader)), 100, 1024 * 1024, "") val response = consumer.fetch(new FetchRequestBuilder().addFetch(topic1, partition, 0, Int.MaxValue).build()).messageSet(topic1, partition) consumer.close response http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala index fc1ceec..874637b 100755 --- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala @@ -75,7 +75,7 @@ class ProducerCompressionTest(compression: String) extends ZooKeeperTestHarness props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") var producer = new KafkaProducer[Array[Byte],Array[Byte]](props) - val consumer = new SimpleConsumer("localhost", server.boundPort(), 100, 1024*1024, "") + val consumer = new SimpleConsumer("localhost", TestUtils.boundPort(server), 100, 1024*1024, "") try { // create topic http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala new file mode 100644 index 0000000..5bd6414 --- /dev/null +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.io.File +import java.util.Collections +import java.util.concurrent.TimeUnit + +import kafka.common.Topic +import kafka.coordinator.OffsetConfig +import kafka.utils.{CoreUtils, TestUtils} +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.network.{ListenerName, Mode} +import org.apache.kafka.common.protocol.SecurityProtocol +import org.junit.Assert.assertEquals +import org.junit.{After, Before, Test} + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ + +class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness { + + private val trustStoreFile = File.createTempFile("truststore", ".jks") + private val servers = new ArrayBuffer[KafkaServer] + private val producers = mutable.Map[ListenerName, KafkaProducer[Array[Byte], Array[Byte]]]() + private val consumers = mutable.Map[ListenerName, KafkaConsumer[Array[Byte], Array[Byte]]]() + + @Before + override def setUp(): Unit = { + super.setUp() + // 2 brokers so that we can test that the data propagates correctly via UpdateMetadadaRequest + val numServers = 2 + + (0 until numServers).foreach { brokerId => + + val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile)) + // Ensure that we can support multiple listeners per security protocol and multiple security protocols + props.put(KafkaConfig.ListenersProp, "SECURE_INTERNAL://localhost:0, INTERNAL://localhost:0, " + + "SECURE_EXTERNAL://localhost:0, EXTERNAL://localhost:0") + props.put(KafkaConfig.ListenerSecurityProtocolMapProp, "INTERNAL:PLAINTEXT, SECURE_INTERNAL:SSL," + + "EXTERNAL:PLAINTEXT, SECURE_EXTERNAL:SSL") + props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL") + props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")) + + servers += TestUtils.createServer(KafkaConfig.fromProps(props)) + } + + val serverConfig = servers.head.config + assertEquals(4, serverConfig.listeners.size) + + TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, OffsetConfig.DefaultOffsetsTopicNumPartitions, + replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs) + + serverConfig.listeners.foreach { endPoint => + val listenerName = endPoint.listenerName + + TestUtils.createTopic(zkUtils, listenerName.value, 2, 2, servers) + + val trustStoreFile = + if (endPoint.securityProtocol == SecurityProtocol.SSL) Some(this.trustStoreFile) + else None + + val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName) + + producers(listenerName) = TestUtils.createNewProducer(bootstrapServers, acks = -1, + securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile) + + consumers(listenerName) = TestUtils.createNewConsumer(bootstrapServers, groupId = listenerName.value, + securityProtocol = endPoint.securityProtocol, trustStoreFile = trustStoreFile) + } + } + + @After + override def tearDown() { + producers.values.foreach(_.close()) + consumers.values.foreach(_.close()) + servers.foreach { s => + s.shutdown() + CoreUtils.delete(s.config.logDirs) + } + super.tearDown() + } + + /** + * Tests that we can produce and consume to/from all broker-defined listeners and security protocols. We produce + * with acks=-1 to ensure that replication is also working. + */ + @Test + def testProduceConsume(): Unit = { + producers.foreach { case (listenerName, producer) => + val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes, + s"value$i".getBytes)) + producerRecords.map(producer.send(_)).map(_.get(10, TimeUnit.SECONDS)) + + val consumer = consumers(listenerName) + consumer.subscribe(Collections.singleton(listenerName.value)) + val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]] + TestUtils.waitUntilTrue(() => { + records ++= consumer.poll(50).asScala + records.size == producerRecords.size + }, s"Consumed ${records.size} records until timeout instead of the expected ${producerRecords.size} records") + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index 0f846e1..d95d90d 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -22,11 +22,12 @@ import org.junit.Assert._ import org.apache.kafka.common.protocol.SecurityProtocol import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ -import kafka.utils.{ZkUtils, CoreUtils, TestUtils} +import kafka.utils.{CoreUtils, TestUtils} import kafka.cluster.Broker import kafka.client.ClientUtils import kafka.server.{KafkaConfig, KafkaServer} -import org.junit.{Test, After, Before} +import org.apache.kafka.common.network.ListenerName +import org.junit.{After, Before, Test} class AddPartitionsTest extends ZooKeeperTestHarness { var configs: Seq[KafkaConfig] = null @@ -47,7 +48,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness { configs = (0 until 4).map(i => KafkaConfig.fromProps(TestUtils.createBrokerConfig(i, zkConnect, enableControlledShutdown = false))) // start all the servers servers = configs.map(c => TestUtils.createServer(c)) - brokers = servers.map(s => new Broker(s.config.brokerId, s.config.hostName, s.boundPort())) + brokers = servers.map(s => TestUtils.createBroker(s.config.brokerId, s.config.hostName, TestUtils.boundPort(s))) // create topics first createTopic(zkUtils, topic1, partitionReplicaAssignment = Map(0->Seq(0,1)), servers = servers) @@ -97,8 +98,9 @@ class AddPartitionsTest extends ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testIncrementPartitions", - 2000,0).topicsMetadata + val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.getBrokerEndPoint(listenerName)), + "AddPartitionsTest-testIncrementPartitions", 2000, 0).topicsMetadata val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1)) val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata.sortBy(_.partitionId) assertEquals(partitionDataForTopic1.size, 3) @@ -123,8 +125,9 @@ class AddPartitionsTest extends ZooKeeperTestHarness { // read metadata from a broker and verify the new topic partitions exist TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testManualAssignmentOfReplicas", - 2000,0).topicsMetadata + val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), + brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), + "AddPartitionsTest-testManualAssignmentOfReplicas", 2000, 0).topicsMetadata val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2)) val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata.sortBy(_.partitionId) assertEquals(partitionDataForTopic2.size, 3) @@ -148,8 +151,9 @@ class AddPartitionsTest extends ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 5) TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacementAllServers", - 2000,0).topicsMetadata + val metadata = ClientUtils.fetchTopicMetadata(Set(topic3), + brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), + "AddPartitionsTest-testReplicaPlacementAllServers", 2000, 0).topicsMetadata val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get @@ -170,8 +174,9 @@ class AddPartitionsTest extends ZooKeeperTestHarness { TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1) TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2) - val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), brokers.map(_.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), "AddPartitionsTest-testReplicaPlacementPartialServers", - 2000,0).topicsMetadata + val metadata = ClientUtils.fetchTopicMetadata(Set(topic2), + brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), + "AddPartitionsTest-testReplicaPlacementPartialServers", 2000, 0).topicsMetadata val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala index f2a2362..924daf8 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala @@ -18,15 +18,13 @@ package kafka.admin import java.util.Properties -import kafka.cluster.Broker import kafka.common.TopicAndPartition import kafka.log.LogConfig import kafka.log.LogConfig._ import kafka.server.{ConfigType, DynamicConfig} import kafka.utils.CoreUtils._ import kafka.utils.TestUtils._ -import kafka.utils.{CoreUtils, Logging, ZkUtils} -import org.apache.kafka.common.protocol.SecurityProtocol +import kafka.utils.{CoreUtils, Logging, TestUtils, ZkUtils} import org.easymock.EasyMock._ import org.easymock.{Capture, CaptureType, EasyMock} import org.junit.{Before, Test} @@ -381,7 +379,7 @@ class ReassignPartitionsCommandTest extends Logging { brokers: Seq[Int] = Seq[Int]()): ZkUtils = { val zk = createMock(classOf[ZkUtils]) expect(zk.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Seq[String]])).andStubReturn(existingAssignment) - expect(zk.getAllBrokersInCluster()).andStubReturn(brokers.map { id => new Broker(id, "", 1, SecurityProtocol.PLAINTEXT) }) + expect(zk.getAllBrokersInCluster()).andStubReturn(brokers.map(TestUtils.createBroker(_, "", 1))) replay(zk) zk } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala index e93cae3..7806765 100644 --- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala +++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala @@ -17,13 +17,15 @@ package kafka.api -import kafka.cluster.{Broker, EndPoint} import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError} import kafka.common._ import kafka.message.{ByteBufferMessageSet, Message} import kafka.common.TopicAndPartition +import kafka.utils.TestUtils +import TestUtils.createBroker import java.nio.ByteBuffer +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.utils.Time import org.junit._ @@ -74,9 +76,8 @@ object SerializationTestUtils { TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100) ) - private val brokers = List(new Broker(0, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1011, SecurityProtocol.PLAINTEXT))), - new Broker(1, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1012, SecurityProtocol.PLAINTEXT))), - new Broker(2, Map(SecurityProtocol.PLAINTEXT -> EndPoint("localhost", 1013, SecurityProtocol.PLAINTEXT)))) + private val brokers = List(createBroker(0, "localhost", 1011), createBroker(0, "localhost", 1012), + createBroker(0, "localhost", 1013)) def createTestProducerRequest: ProducerRequest = { new ProducerRequest(1, "client 1", 0, 1000, topicDataProducerRequest) @@ -88,13 +89,9 @@ object SerializationTestUtils { TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001) ), ProducerRequest.CurrentVersion, 100) - def createTestFetchRequest: FetchRequest = { - new FetchRequest(requestInfo = requestInfos.toVector) - } + def createTestFetchRequest: FetchRequest = new FetchRequest(requestInfo = requestInfos.toVector) - def createTestFetchResponse: FetchResponse = { - FetchResponse(1, topicDataFetchResponse.toVector) - } + def createTestFetchResponse: FetchResponse = FetchResponse(1, topicDataFetchResponse.toVector) def createTestOffsetRequest = new OffsetRequest( collection.immutable.Map(TopicAndPartition(topic1, 1) -> PartitionOffsetRequestInfo(1000, 200)), @@ -156,12 +153,11 @@ object SerializationTestUtils { )) } - def createConsumerMetadataRequest: GroupCoordinatorRequest = { - GroupCoordinatorRequest("group 1", clientId = "client 1") - } + def createConsumerMetadataRequest: GroupCoordinatorRequest = GroupCoordinatorRequest("group 1", clientId = "client 1") def createConsumerMetadataResponse: GroupCoordinatorResponse = { - GroupCoordinatorResponse(Some(brokers.head.getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), Errors.NONE.code, 0) + GroupCoordinatorResponse(Some( + brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE.code, 0) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala index 5554b39..20b7e25 100644 --- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala @@ -19,7 +19,8 @@ package kafka.cluster import java.nio.ByteBuffer -import kafka.utils.Logging +import kafka.utils.{Logging, TestUtils} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Test @@ -29,14 +30,10 @@ class BrokerEndPointTest extends Logging { @Test def testHashAndEquals() { - val endpoint1 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) - val endpoint2 = new EndPoint("myhost", 9092, SecurityProtocol.PLAINTEXT) - val endpoint3 = new EndPoint("myhost", 1111, SecurityProtocol.PLAINTEXT) - val endpoint4 = new EndPoint("other", 1111, SecurityProtocol.PLAINTEXT) - val broker1 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint1)) - val broker2 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint2)) - val broker3 = new Broker(2, Map(SecurityProtocol.PLAINTEXT -> endpoint3)) - val broker4 = new Broker(1, Map(SecurityProtocol.PLAINTEXT -> endpoint4)) + val broker1 = TestUtils.createBroker(1, "myhost", 9092) + val broker2 = TestUtils.createBroker(1, "myhost", 9092) + val broker3 = TestUtils.createBroker(2, "myhost", 1111) + val broker4 = TestUtils.createBroker(1, "other", 1111) assert(broker1 == broker2) assert(broker1 != broker3) @@ -64,31 +61,36 @@ class BrokerEndPointTest extends Logging { }""" val broker = Broker.createBroker(1, brokerInfoStr) assert(broker.id == 1) - assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).host == "localhost") - assert(broker.getBrokerEndPoint(SecurityProtocol.SSL).port == 9093) + val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL)) + assert(brokerEndPoint.host == "localhost") + assert(brokerEndPoint.port == 9093) } @Test def testFromJsonV2 { - val brokerInfoStr = "{\"version\":2," + - "\"host\":\"localhost\"," + - "\"port\":9092," + - "\"jmx_port\":9999," + - "\"timestamp\":\"1416974968782\"," + - "\"endpoints\":[\"PLAINTEXT://localhost:9092\"]}" + val brokerInfoStr = """{ + "version":2, + "host":"localhost", + "port":9092, + "jmx_port":9999, + "timestamp":"1416974968782", + "endpoints":["PLAINTEXT://localhost:9092"] + }""" val broker = Broker.createBroker(1, brokerInfoStr) assert(broker.id == 1) - assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "localhost") - assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9092) + val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + assert(brokerEndPoint.host == "localhost") + assert(brokerEndPoint.port == 9092) } @Test def testFromJsonV1() = { - val brokerInfoStr = "{\"jmx_port\":-1,\"timestamp\":\"1420485325400\",\"host\":\"172.16.8.243\",\"version\":1,\"port\":9091}" + val brokerInfoStr = """{"jmx_port":-1,"timestamp":"1420485325400","host":"172.16.8.243","version":1,"port":9091}""" val broker = Broker.createBroker(1, brokerInfoStr) assert(broker.id == 1) - assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).host == "172.16.8.243") - assert(broker.getBrokerEndPoint(SecurityProtocol.PLAINTEXT).port == 9091) + val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + assert(brokerEndPoint.host == "172.16.8.243") + assert(brokerEndPoint.port == 9091) } @Test @@ -122,37 +124,37 @@ class BrokerEndPointTest extends Logging { @Test def testEndpointFromUri() { var connectionString = "PLAINTEXT://localhost:9092" - var endpoint = EndPoint.createEndPoint(connectionString) + var endpoint = EndPoint.createEndPoint(connectionString, None) assert(endpoint.host == "localhost") assert(endpoint.port == 9092) assert(endpoint.connectionString == "PLAINTEXT://localhost:9092") // KAFKA-3719 connectionString = "PLAINTEXT://local_host:9092" - endpoint = EndPoint.createEndPoint(connectionString) + endpoint = EndPoint.createEndPoint(connectionString, None) assert(endpoint.host == "local_host") assert(endpoint.port == 9092) assert(endpoint.connectionString == "PLAINTEXT://local_host:9092") // also test for default bind connectionString = "PLAINTEXT://:9092" - endpoint = EndPoint.createEndPoint(connectionString) + endpoint = EndPoint.createEndPoint(connectionString, None) assert(endpoint.host == null) assert(endpoint.port == 9092) assert(endpoint.connectionString == "PLAINTEXT://:9092") // also test for ipv6 connectionString = "PLAINTEXT://[::1]:9092" - endpoint = EndPoint.createEndPoint(connectionString) + endpoint = EndPoint.createEndPoint(connectionString, None) assert(endpoint.host == "::1") assert(endpoint.port == 9092) assert(endpoint.connectionString == "PLAINTEXT://[::1]:9092") // test for ipv6 with % character connectionString = "PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092" - endpoint = EndPoint.createEndPoint(connectionString) + endpoint = EndPoint.createEndPoint(connectionString, None) assert(endpoint.host == "fe80::b1da:69ca:57f7:63d8%3") assert(endpoint.port == 9092) assert(endpoint.connectionString == "PLAINTEXT://[fe80::b1da:69ca:57f7:63d8%3]:9092") // test hostname connectionString = "PLAINTEXT://MyHostname:9092" - endpoint = EndPoint.createEndPoint(connectionString) + endpoint = EndPoint.createEndPoint(connectionString, None) assert(endpoint.host == "MyHostname") assert(endpoint.port == 9092) assert(endpoint.connectionString == "PLAINTEXT://MyHostname:9092") http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala index 24ed954..b402b25 100644 --- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala @@ -22,7 +22,7 @@ import java.io.File import kafka.admin.AdminUtils import kafka.api.TopicMetadataResponse import kafka.client.ClientUtils -import kafka.cluster.{Broker, BrokerEndPoint} +import kafka.cluster.BrokerEndPoint import kafka.server.{KafkaConfig, KafkaServer, NotRunning} import kafka.utils.TestUtils import kafka.utils.TestUtils._ @@ -52,7 +52,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { brokerEndPoints = Seq( // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use // `securityProtocol` instead of PLAINTEXT below - new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, server1.boundPort(SecurityProtocol.PLAINTEXT)) + new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, TestUtils.boundPort(server1)) ) } @@ -69,7 +69,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server1)) val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", - 2000,0).topicsMetadata + 2000, 0).topicsMetadata assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode) assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode) assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) @@ -143,12 +143,12 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use // `securityProtocol` instead of PLAINTEXT below val adHocEndpoint = new BrokerEndPoint(adHocServer.config.brokerId, adHocServer.config.hostName, - adHocServer.boundPort(SecurityProtocol.PLAINTEXT)) + TestUtils.boundPort(adHocServer)) // auto create topic on "bad" endpoint val topic = "testAutoCreateTopic" val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), Seq(adHocEndpoint), "TopicMetadataTest-testAutoCreateTopic", - 2000,0).topicsMetadata + 2000, 0).topicsMetadata assertEquals(Errors.INVALID_REPLICATION_FACTOR.code, topicsMetadata.head.errorCode) assertEquals("Expecting metadata only for 1 topic", 1, topicsMetadata.size) assertEquals("Expecting metadata for the test topic", topic, topicsMetadata.head.topic) @@ -163,7 +163,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { val topic1 = "testAutoCreate_Topic" val topic2 = "testAutoCreate.Topic" var topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1, topic2), brokerEndPoints, "TopicMetadataTest-testAutoCreateTopic", - 2000,0).topicsMetadata + 2000, 0).topicsMetadata assertEquals("Expecting metadata for 2 topics", 2, topicsMetadata.size) assertEquals("Expecting metadata for topic1", topic1, topicsMetadata.head.topic) assertEquals(Errors.LEADER_NOT_AVAILABLE.code, topicsMetadata.head.errorCode) @@ -176,7 +176,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { // retry the metadata for the first auto created topic topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokerEndPoints, "TopicMetadataTest-testBasicTopicMetadata", - 2000,0).topicsMetadata + 2000, 0).topicsMetadata assertEquals(Errors.NONE.code, topicsMetadata.head.errorCode) assertEquals(Errors.NONE.code, topicsMetadata.head.partitionsMetadata.head.errorCode) var partitionMetadata = topicsMetadata.head.partitionsMetadata @@ -188,24 +188,21 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { private def checkIsr(servers: Seq[KafkaServer]): Unit = { val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state) - val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map( - x => new BrokerEndPoint(x.config.brokerId, - if (x.config.hostName.nonEmpty) x.config.hostName else "localhost", - x.boundPort()) - ) + val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map { x => + new BrokerEndPoint(x.config.brokerId, + if (x.config.hostName.nonEmpty) x.config.hostName else "localhost", + TestUtils.boundPort(x)) + } // Assert that topic metadata at new brokers is updated correctly activeBrokers.foreach(x => { var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1) waitUntilTrue(() => { - metadata = ClientUtils.fetchTopicMetadata( - Set.empty, - Seq(new BrokerEndPoint( - x.config.brokerId, - if (x.config.hostName.nonEmpty) x.config.hostName else "localhost", - x.boundPort())), - "TopicMetadataTest-testBasicTopicMetadata", - 2000, 0) + metadata = ClientUtils.fetchTopicMetadata(Set.empty, + Seq(new BrokerEndPoint(x.config.brokerId, + if (x.config.hostName.nonEmpty) x.config.hostName else "localhost", + TestUtils.boundPort(x))), + "TopicMetadataTest-testBasicTopicMetadata", 2000, 0) metadata.topicsMetadata.nonEmpty && metadata.topicsMetadata.head.partitionsMetadata.nonEmpty && expectedIsr.sortBy(_.id) == metadata.topicsMetadata.head.partitionsMetadata.head.isr.sortBy(_.id) @@ -263,9 +260,7 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { waitUntilTrue(() => { val foundMetadata = ClientUtils.fetchTopicMetadata( Set.empty, - Seq(new Broker(x.config.brokerId, - x.config.hostName, - x.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)), + Seq(new BrokerEndPoint(x.config.brokerId, x.config.hostName, TestUtils.boundPort(x))), "TopicMetadataTest-testBasicTopicMetadata", 2000, 0) topicMetadata.brokers.sortBy(_.id) == foundMetadata.brokers.sortBy(_.id) && topicMetadata.topicsMetadata.sortBy(_.topic) == foundMetadata.topicsMetadata.sortBy(_.topic) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/integration/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 003c04c..3f59302 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -44,7 +44,9 @@ class FetcherTest extends KafkaServerTestHarness { super.setUp TestUtils.createTopic(zkUtils, topic, partitionReplicaAssignment = Map(0 -> Seq(configs.head.brokerId)), servers = servers) - val cluster = new Cluster(servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort()))) + val cluster = new Cluster(servers.map { s => + new Broker(s.config.brokerId, "localhost", boundPort(s), listenerName, securityProtocol) + }) fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkUtils) fetcher.stopConnections() http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 4bbdedb..270fca2 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -19,6 +19,7 @@ package kafka.integration import java.io.File import java.util.Arrays + import kafka.common.KafkaException import kafka.server._ import kafka.utils.{CoreUtils, TestUtils} @@ -26,9 +27,12 @@ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.{After, Before} + import scala.collection.mutable.Buffer import java.util.Properties +import org.apache.kafka.common.network.ListenerName + /** * A test harness that brings up some number of broker nodes */ @@ -66,7 +70,10 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { def serverForId(id: Int): Option[KafkaServer] = servers.find(s => s.config.brokerId == id) + def boundPort(server: KafkaServer): Int = server.boundPort(listenerName) + protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT + protected def listenerName: ListenerName = ListenerName.forSecurityProtocol(securityProtocol) protected def trustStoreFile: Option[File] = None protected def saslProperties: Option[Properties] = None http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index bdf116f..e3115e1 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -36,7 +36,7 @@ trait ProducerConsumerTestHarness extends KafkaServerTestHarness { encoder = classOf[StringEncoder].getName, keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[StaticPartitioner].getName) - consumer = new SimpleConsumer(host, servers.head.boundPort(), 1000000, 64 * 1024, "") + consumer = new SimpleConsumer(host, TestUtils.boundPort(servers.head), 1000000, 64 * 1024, "") } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 8a0ae1a..b8e3a8a 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -30,7 +30,7 @@ import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.network.NetworkSend +import org.apache.kafka.common.network.{ListenerName, NetworkSend} import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader} @@ -99,7 +99,7 @@ class SocketServerTest extends JUnitSuite { } def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT) = { - val socket = new Socket("localhost", s.boundPort(protocol)) + val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol))) sockets += socket socket } @@ -280,7 +280,8 @@ class SocketServerTest extends JUnitSuite { val sslContext = SSLContext.getInstance("TLSv1.2") sslContext.init(null, Array(TestUtils.trustAllCerts), new java.security.SecureRandom()) val socketFactory = sslContext.getSocketFactory - val sslSocket = socketFactory.createSocket("localhost", overrideServer.boundPort(SecurityProtocol.SSL)).asInstanceOf[SSLSocket] + val sslSocket = socketFactory.createSocket("localhost", + overrideServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))).asInstanceOf[SSLSocket] sslSocket.setNeedClientAuth(false) val apiKey = ApiKeys.PRODUCE.id @@ -324,9 +325,10 @@ class SocketServerTest extends JUnitSuite { val serverMetrics = new Metrics var conn: Socket = null val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) { - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol): Processor = { + override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + protocol: SecurityProtocol): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, - config.connectionsMaxIdleMs, protocol, config.values, metrics, credentialProvider) { + config.connectionsMaxIdleMs, listenerName, protocol, config.values, metrics, credentialProvider) { override protected[network] def sendResponse(response: RequestChannel.Response) { conn.close() super.sendResponse(response) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index f5943d6..7a00f2a 100755 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -36,6 +36,7 @@ import kafka.utils.TestUtils._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import kafka.utils._ +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.utils.Time @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0") @@ -49,7 +50,7 @@ class AsyncProducerTest { val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port = 65534)) val configs = props.map(KafkaConfig.fromProps) val brokerList = configs.map { config => - val endPoint = config.advertisedListeners.get(SecurityProtocol.PLAINTEXT).get + val endPoint = config.advertisedListeners.find(_.securityProtocol == SecurityProtocol.PLAINTEXT).get org.apache.kafka.common.utils.Utils.formatAddress(endPoint.host, endPoint.port) }.mkString(",") http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 769ea33..63ec83e 100755 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -53,13 +53,13 @@ class ProducerTest extends ZooKeeperTestHarness with Logging{ // ports and then get a consumer instance that will be pointed at the correct port def getConsumer1() = { if (consumer1 == null) - consumer1 = new SimpleConsumer("localhost", server1.boundPort(), 1000000, 64*1024, "") + consumer1 = new SimpleConsumer("localhost", TestUtils.boundPort(server1), 1000000, 64*1024, "") consumer1 } def getConsumer2() = { if (consumer2 == null) - consumer2 = new SimpleConsumer("localhost", server2.boundPort(), 100, 64*1024, "") + consumer2 = new SimpleConsumer("localhost", TestUtils.boundPort(server2), 100, 64*1024, "") consumer2 } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2567188/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index d63afe7..c20aab3 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -27,7 +27,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.message._ import kafka.server.KafkaConfig import kafka.utils._ -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.utils.Time import org.junit.Test import org.junit.Assert._ @@ -52,7 +52,7 @@ class SyncProducerTest extends KafkaServerTestHarness { def testReachableServer() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val props = TestUtils.getSyncProducerConfig(boundPort(server)) val producer = new SyncProducer(new SyncProducerConfig(props)) @@ -88,7 +88,7 @@ class SyncProducerTest extends KafkaServerTestHarness { @Test def testEmptyProduceRequest() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val props = TestUtils.getSyncProducerConfig(boundPort(server)) val correlationId = 0 @@ -106,7 +106,7 @@ class SyncProducerTest extends KafkaServerTestHarness { @Test def testMessageSizeTooLarge() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val props = TestUtils.getSyncProducerConfig(boundPort(server)) val producer = new SyncProducer(new SyncProducerConfig(props)) TestUtils.createTopic(zkUtils, "test", numPartitions = 1, replicationFactor = 1, servers = servers) @@ -133,7 +133,7 @@ class SyncProducerTest extends KafkaServerTestHarness { @Test def testMessageSizeTooLargeWithAckZero() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val props = TestUtils.getSyncProducerConfig(boundPort(server)) props.put("request.required.acks", "0") @@ -159,7 +159,7 @@ class SyncProducerTest extends KafkaServerTestHarness { @Test def testProduceCorrectlyReceivesResponse() { val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val props = TestUtils.getSyncProducerConfig(boundPort(server)) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -207,7 +207,7 @@ class SyncProducerTest extends KafkaServerTestHarness { val timeoutMs = 500 val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val props = TestUtils.getSyncProducerConfig(boundPort(server)) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -233,7 +233,7 @@ class SyncProducerTest extends KafkaServerTestHarness { def testProduceRequestWithNoResponse() { val server = servers.head - val port = server.socketServer.boundPort(SecurityProtocol.PLAINTEXT) + val port = TestUtils.boundPort(server) val props = TestUtils.getSyncProducerConfig(port) val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId @@ -249,7 +249,7 @@ class SyncProducerTest extends KafkaServerTestHarness { def testNotEnoughReplicas() { val topicName = "minisrtest" val server = servers.head - val props = TestUtils.getSyncProducerConfig(server.socketServer.boundPort()) + val props = TestUtils.getSyncProducerConfig(boundPort(server)) props.put("request.required.acks", "-1")