Repository: kafka
Updated Branches:
  refs/heads/trunk 1eac3f33f -> ca0c071c1


http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/SaslSetup.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 36b9d41..29aea61 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -46,7 +46,8 @@ trait SaslSetup {
   private var serverKeytabFile: Option[File] = null
   private var clientKeytabFile: Option[File] = null
 
-  def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: 
List[String], kafkaClientSaslMechanism: Option[String]) {
+  def startSasl(kafkaServerSaslMechanisms: List[String], 
kafkaClientSaslMechanism: Option[String],
+                mode: SaslSetupMode = Both, kafkaServerJaasEntryName: String = 
JaasTestUtils.KafkaServerContextName) {
     // Important if tests leak consumers, producers or brokers
     LoginManager.closeAll()
     val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == 
Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI"))
@@ -63,16 +64,19 @@ trait SaslSetup {
       this.clientKeytabFile = None
       this.serverKeytabFile = None
     }
-    setJaasConfiguration(mode, kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism)
+    setJaasConfiguration(mode, kafkaServerJaasEntryName, 
kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
     if (mode == Both || mode == ZkSasl)
       System.setProperty("zookeeper.authProvider.1", 
"org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
   }
 
-  protected def setJaasConfiguration(mode: SaslSetupMode, 
kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: 
Option[String]) {
+  protected def setJaasConfiguration(mode: SaslSetupMode, 
kafkaServerEntryName: String,
+                                     kafkaServerSaslMechanisms: List[String], 
kafkaClientSaslMechanism: Option[String]) {
     val jaasFile = mode match {
       case ZkSasl => JaasTestUtils.writeZkFile()
-      case KafkaSasl => 
JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
-      case Both => 
JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, 
kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
+      case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerEntryName, 
kafkaServerSaslMechanisms,
+        kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
+      case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerEntryName, 
kafkaServerSaslMechanisms,
+        kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
     }
     // This will cause a reload of the Configuration singleton when 
`getConfiguration` is called
     Configuration.setConfiguration(null)
@@ -104,5 +108,7 @@ trait SaslSetup {
     props
   }
 
-  def jaasClientLoginModule(clientSaslMechanism: String): String = 
JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
+  def jaasClientLoginModule(clientSaslMechanism: String): String =
+    JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
index 97faa36..445a59c 100644
--- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala
@@ -12,20 +12,22 @@
   */
 package kafka.api
 
+import kafka.utils.JaasTestUtils
 import kafka.zk.ZooKeeperTestHarness
 import org.junit.{After, Before}
 
 trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup {
   protected val zkSaslEnabled: Boolean
+  protected val kafkaServerJaasEntryName = JaasTestUtils.KafkaServerContextName
   protected val kafkaClientSaslMechanism = "GSSAPI"
   protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
 
   @Before
   override def setUp() {
     if (zkSaslEnabled)
-      startSasl(Both, kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism))
+      startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), 
Both, kafkaServerJaasEntryName)
     else
-      startSasl(KafkaSasl, kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism))
+      startSasl(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), 
KafkaSasl, kafkaServerJaasEntryName)
     super.setUp
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index 064e783..4eca6e2 100644
--- 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -29,7 +29,7 @@ class SslEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest {
 
   @Before
   override def setUp {
-    startSasl(ZkSasl, List.empty, None)
+    startSasl(List.empty, None, ZkSasl)
     super.setUp
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
index 5bd6414..37db174 100644
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolTest.scala
@@ -28,6 +28,7 @@ import kafka.utils.{CoreUtils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.Assert.assertEquals
@@ -61,6 +62,13 @@ class MultipleListenersWithSameSecurityProtocolTest extends 
ZooKeeperTestHarness
       props.put(KafkaConfig.InterBrokerListenerNameProp, "INTERNAL")
       props.putAll(TestUtils.sslConfigs(Mode.SERVER, false, 
Some(trustStoreFile), s"server$brokerId"))
 
+      // set listener-specific configs and set an invalid path for the global 
config to verify that the overrides work
+      Seq("SECURE_INTERNAL", "SECURE_EXTERNAL").foreach { listenerName =>
+        props.put(new ListenerName(listenerName).configPrefix + 
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+          props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      }
+      props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
+
       servers += TestUtils.createServer(KafkaConfig.fromProps(props))
     }
 
@@ -109,7 +117,7 @@ class MultipleListenersWithSameSecurityProtocolTest extends 
ZooKeeperTestHarness
     producers.foreach { case (listenerName, producer) =>
       val producerRecords = (1 to 10).map(i => new 
ProducerRecord(listenerName.value, s"key$i".getBytes,
         s"value$i".getBytes))
-      producerRecords.map(producer.send(_)).map(_.get(10, TimeUnit.SECONDS))
+      producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
 
       val consumer = consumers(listenerName)
       consumer.subscribe(Collections.singleton(listenerName.value))

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index bfaff0b..ff31914 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -89,7 +89,7 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
     configureSecurityBeforeServersStart()
 
     servers = configs.map(TestUtils.createServer(_)).toBuffer
-    brokerList = TestUtils.getBrokerListStrFromServers(servers, 
securityProtocol)
+    brokerList = TestUtils.bootstrapServers(servers, listenerName)
     alive = new Array[Boolean](servers.length)
     Arrays.fill(alive, true)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b8e3a8a..37bc238 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -328,7 +328,7 @@ class SocketServerTest extends JUnitSuite {
       override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, 
listenerName: ListenerName,
                                 protocol: SecurityProtocol): Processor = {
         new Processor(id, time, config.socketRequestMaxBytes, requestChannel, 
connectionQuotas,
-          config.connectionsMaxIdleMs, listenerName, protocol, config.values, 
metrics, credentialProvider) {
+          config.connectionsMaxIdleMs, listenerName, protocol, config, 
metrics, credentialProvider) {
           override protected[network] def sendResponse(response: 
RequestChannel.Response) {
             conn.close()
             super.sendResponse(response)

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
index 0949eb7..7b90abf 100644
--- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala
@@ -99,7 +99,7 @@ object JaasTestUtils {
   private val ZkUserPassword = "fpjsecret"
   private val ZkModule = "org.apache.zookeeper.server.auth.DigestLoginModule"
 
-  private val KafkaServerContextName = "KafkaServer"
+  val KafkaServerContextName = "KafkaServer"
   val KafkaServerPrincipalUnqualifiedName = "kafka"
   private val KafkaServerPrincipal = KafkaServerPrincipalUnqualifiedName + 
"/[email protected]"
   private val KafkaClientContextName = "KafkaClient"
@@ -128,16 +128,22 @@ object JaasTestUtils {
     jaasFile.getCanonicalPath
   }
 
-  def writeKafkaFile(kafkaServerSaslMechanisms: List[String], 
kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], 
clientKeyTabLocation: Option[File]): String = {
+  def writeKafkaFile(serverEntryName: String, kafkaServerSaslMechanisms: 
List[String],
+                     kafkaClientSaslMechanism: Option[String], 
serverKeyTabLocation: Option[File],
+                     clientKeyTabLocation: Option[File]): String = {
     val jaasFile = TestUtils.tempFile()
-    val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, 
serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, 
clientKeyTabLocation))
+    val kafkaSections = Seq(kafkaServerSection(serverEntryName, 
kafkaServerSaslMechanisms, serverKeyTabLocation),
+      kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
     writeToFile(jaasFile, kafkaSections)
     jaasFile.getCanonicalPath
   }
 
-  def writeZkAndKafkaFiles(kafkaServerSaslMechanisms: List[String], 
kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], 
clientKeyTabLocation: Option[File]): String = {
+  def writeZkAndKafkaFiles(serverEntryName: String, kafkaServerSaslMechanisms: 
List[String],
+                           kafkaClientSaslMechanism: Option[String], 
serverKeyTabLocation: Option[File],
+                           clientKeyTabLocation: Option[File]): String = {
     val jaasFile = TestUtils.tempFile()
-    val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, 
serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, 
clientKeyTabLocation))
+    val kafkaSections = Seq(kafkaServerSection(serverEntryName, 
kafkaServerSaslMechanisms, serverKeyTabLocation),
+      kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation))
     writeToFile(jaasFile, kafkaSections ++ zkSections)
     jaasFile.getCanonicalPath
   }
@@ -151,7 +157,7 @@ object JaasTestUtils {
     new JaasSection(ZkClientContextName, Seq(JaasModule(ZkModule, false, 
Map("username" -> ZkUser, "password" -> ZkUserPassword))))
   )
 
-  private def kafkaServerSection(mechanisms: List[String], keytabLocation: 
Option[File]): JaasSection = {
+  private def kafkaServerSection(contextName: String, mechanisms: 
List[String], keytabLocation: Option[File]): JaasSection = {
     val modules = mechanisms.map {
       case "GSSAPI" =>
         Krb5LoginModule(
@@ -174,7 +180,7 @@ object JaasTestUtils {
           debug = false).toJaasModule
       case mechanism => throw new IllegalArgumentException("Unsupported server 
mechanism " + mechanism)
     }
-    new JaasSection(KafkaServerContextName, modules)
+    new JaasSection(contextName, modules)
   }
 
   // consider refactoring if more mechanisms are added

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index c530e07..f132f9e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -166,7 +166,7 @@ object TestUtils extends Logging {
   def bootstrapServers(servers: Seq[KafkaServer], listenerName: ListenerName): 
String = {
     servers.map { s =>
       val listener = s.config.advertisedListeners.find(_.listenerName == 
listenerName).getOrElse(
-        sys.error(s"Could not find listener with name $listenerName"))
+        sys.error(s"Could not find listener with name ${listenerName.value}"))
       formatAddress(listener.host, s.boundPort(listenerName))
     }.mkString(",")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ca0c071c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index afde63f..ede0b45 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -100,7 +100,7 @@ public class StreamsKafkaClient {
         reporters.add(new JmxReporter("kafka.admin"));
         final Metrics metrics = new Metrics(metricConfig, reporters, time);
 
-        final ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(streamsConfig.values());
+        final ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(streamsConfig);
 
         final Selector selector = new 
Selector(streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), 
metrics, time, "kafka-client", channelBuilder);
 

Reply via email to