Repository: kafka Updated Branches: refs/heads/trunk 1eac3f33f -> ca0c071c1
http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/SaslSetup.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index 36b9d41..29aea61 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -46,7 +46,8 @@ trait SaslSetup { private var serverKeytabFile: Option[File] = null private var clientKeytabFile: Option[File] = null - def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { + def startSasl(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], + mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = JaasTestUtils.KafkaServerContextName) { // Important if tests leak consumers, producers or brokers LoginManager.closeAll() val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI")) @@ -63,16 +64,19 @@ trait SaslSetup { this.clientKeytabFile = None this.serverKeytabFile = None } - setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) + setJaasConfiguration(mode, kafkaServerJaasEntryName, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) if (mode == Both || mode == ZkSasl) System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") } - protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { + protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerEntryName: String, + kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { val jaasFile = mode match { case ZkSasl => JaasTestUtils.writeZkFile() - case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) - case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) + case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerEntryName, kafkaServerSaslMechanisms, + kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) + case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerEntryName, kafkaServerSaslMechanisms, + kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) } // This will cause a reload of the Configuration singleton when `getConfiguration` is called Configuration.setConfiguration(null) @@ -104,5 +108,7 @@ trait SaslSetup { props } - def jaasClientLoginModule(clientSaslMechanism: String): String = JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile) + def jaasClientLoginModule(clientSaslMechanism: String): String = + JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile) + } http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala index 97faa36..445a59c 100644 --- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala @@ -12,20 +12,22 @@ */ package kafka.api +import kafka.utils.JaasTestUtils import kafka.zk.ZooKeeperTestHarness import org.junit.{After, Before} trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { protected val zkSaslEnabled: Boolean + protected val kafkaServerJaasEntryName = JaasTestUtils.KafkaServerContextName protected val kafkaClientSaslMechanism = "GSSAPI" protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) @Before override def setUp() { if (zkSaslEnabled) - startSasl(Both, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)) + startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), Both, kafkaServerJaasEntryName) else - startSasl(KafkaSasl, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)) + startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName) super.setUp } http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 064e783..4eca6e2 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -29,7 +29,7 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { @Before override def setUp { - startSasl(ZkSasl, List.empty, None) + startSasl(List.empty, None, ZkSasl) super.setUp } } http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala index 5bd6414..37db174 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala @@ -28,6 +28,7 @@ import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.config.SslConfigs import org.apache.kafka.common.network.{ListenerName, Mode} import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.Assert.assertEquals @@ -61,6 +62,13 @@ class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL") props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, Some(trustStoreFile), s"server$brokerId")) + // set listener-specific configs and set an invalid path for the global config to verify that the overrides work + Seq("SECURE_INTERNAL", "SECURE_EXTERNAL").foreach { listenerName => + props.put(new ListenerName(listenerName).configPrefix + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, + props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)) + } + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path") + servers += TestUtils.createServer(KafkaConfig.fromProps(props)) } @@ -109,7 +117,7 @@ class MultipleListenersWithSameSecurityProtocolTest extends ZooKeeperTestHarness producers.foreach { case (listenerName, producer) => val producerRecords = (1 to 10).map(i => new ProducerRecord(listenerName.value, s"key$i".getBytes, s"value$i".getBytes)) - producerRecords.map(producer.send(_)).map(_.get(10, TimeUnit.SECONDS)) + producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS)) val consumer = consumers(listenerName) consumer.subscribe(Collections.singleton(listenerName.value)) http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index bfaff0b..ff31914 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -89,7 +89,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { configureSecurityBeforeServersStart() servers = configs.map(TestUtils.createServer(_)).toBuffer - brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol) + brokerList = TestUtils.bootstrapServers(servers, listenerName) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index b8e3a8a..37bc238 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite { override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, protocol: SecurityProtocol): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, - config.connectionsMaxIdleMs, listenerName, protocol, config.values, metrics, credentialProvider) { + config.connectionsMaxIdleMs, listenerName, protocol, config, metrics, credentialProvider) { override protected[network] def sendResponse(response: RequestChannel.Response) { conn.close() super.sendResponse(response) http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 0949eb7..7b90abf 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -99,7 +99,7 @@ object JaasTestUtils { private val ZkUserPassword = "fpjsecret" private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule" - private val KafkaServerContextName = "KafkaServer" + val KafkaServerContextName = "KafkaServer" val KafkaServerPrincipalUnqualifiedName = "kafka" private val KafkaServerPrincipal = KafkaServerPrincipalUnqualifiedName + "/[email protected]" private val KafkaClientContextName = "KafkaClient" @@ -128,16 +128,22 @@ object JaasTestUtils { jaasFile.getCanonicalPath } - def writeKafkaFile(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { + def writeKafkaFile(serverEntryName: String, kafkaServerSaslMechanisms: List[String], + kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], + clientKeyTabLocation: Option[File]): String = { val jaasFile = TestUtils.tempFile() - val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) + val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation), + kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) writeToFile(jaasFile, kafkaSections) jaasFile.getCanonicalPath } - def writeZkAndKafkaFiles(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { + def writeZkAndKafkaFiles(serverEntryName: String, kafkaServerSaslMechanisms: List[String], + kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], + clientKeyTabLocation: Option[File]): String = { val jaasFile = TestUtils.tempFile() - val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) + val kafkaSections = Seq(kafkaServerSection(serverEntryName, kafkaServerSaslMechanisms, serverKeyTabLocation), + kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) writeToFile(jaasFile, kafkaSections ++ zkSections) jaasFile.getCanonicalPath } @@ -151,7 +157,7 @@ object JaasTestUtils { new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, Map("username" -> ZkUser, "password" -> ZkUserPassword)))) ) - private def kafkaServerSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { + private def kafkaServerSection(contextName: String, mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { val modules = mechanisms.map { case "GSSAPI" => Krb5LoginModule( @@ -174,7 +180,7 @@ object JaasTestUtils { debug = false).toJaasModule case mechanism => throw new IllegalArgumentException("Unsupported server mechanism " + mechanism) } - new JaasSection(KafkaServerContextName, modules) + new JaasSection(contextName, modules) } // consider refactoring if more mechanisms are added http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index c530e07..f132f9e 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -166,7 +166,7 @@ object TestUtils extends Logging { def bootstrapServers(servers: Seq[KafkaServer], listenerName: ListenerName): String = { servers.map { s => val listener = s.config.advertisedListeners.find(_.listenerName == listenerName).getOrElse( - sys.error(s"Could not find listener with name $listenerName")) + sys.error(s"Could not find listener with name ${listenerName.value}")) formatAddress(listener.host, s.boundPort(listenerName)) }.mkString(",") } http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index afde63f..ede0b45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -100,7 +100,7 @@ public class StreamsKafkaClient { reporters.add(new JmxReporter("kafka.admin")); final Metrics metrics = new Metrics(metricConfig, reporters, time); - final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig.values()); + final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig); final Selector selector = new Selector(streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, "kafka-client", channelBuilder);
