Repository: kafka Updated Branches: refs/heads/trunk 8f32617e6 -> 403158b54
http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala deleted file mode 100644 index 5dc4cbc..0000000 --- a/core/src/test/scala/integration/kafka/api/SSLConsumerTest.scala +++ /dev/null @@ -1,231 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.api - -import java.util.Properties -import java.io.File - -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.consumer.Consumer -import org.apache.kafka.clients.consumer.KafkaConsumer -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.TopicPartition -import kafka.integration.KafkaServerTestHarness - -import kafka.utils.{TestUtils, Logging} -import kafka.server.KafkaConfig - -import java.util.ArrayList -import org.junit.{Test, Before, After} -import org.junit.Assert._ - -import scala.collection.mutable.Buffer -import scala.collection.JavaConversions._ -import kafka.coordinator.ConsumerCoordinator - - -/** - * Integration tests for the new consumer that cover basic usage as well as server failures - */ -class SSLConsumerTest extends KafkaServerTestHarness with Logging { - - val trustStoreFile = File.createTempFile("truststore", ".jks") - val numServers = 3 - val producerCount = 1 - val consumerCount = 2 - val producerConfig = new Properties - val consumerConfig = new Properties - - val overridingProps = new Properties() - overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) - overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, "false") // speed up shutdown - overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset - overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") - overridingProps.put(KafkaConfig.ConsumerMinSessionTimeoutMsProp, "100") // set small enough session timeout - - val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() - val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() - - def generateConfigs() = - TestUtils.createBrokerConfigs(numServers, zkConnect, false, enableSSL=true, trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, overridingProps)) - - val topic = "topic" - val part = 0 - val tp = new TopicPartition(topic, part) - - // configure the servers and clients - this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") - this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") - this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) - this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - - @Before - override def setUp() { - super.setUp() - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getSSLBrokerListStrFromServers(servers)) - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) - consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TestUtils.getSSLBrokerListStrFromServers(servers)) - consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) - consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) - consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range") - - for (i <- 0 until producerCount) - producers += TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), - acks = 1, - enableSSL=true, - trustStoreFile=Some(trustStoreFile)) - for (i <- 0 until consumerCount) - consumers += TestUtils.createNewConsumer(TestUtils.getSSLBrokerListStrFromServers(servers), - groupId = "my-test", - partitionAssignmentStrategy= "range", - enableSSL=true, - trustStoreFile=Some(trustStoreFile)) - - - // create the consumer offset topic - TestUtils.createTopic(zkUtils, ConsumerCoordinator.OffsetsTopicName, - overridingProps.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt, - overridingProps.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, - servers, - servers(0).consumerCoordinator.offsetsTopicConfigs) - - // create the test topic with all the brokers as replicas - TestUtils.createTopic(zkUtils, topic, 1, numServers, this.servers) - } - - @After - override def tearDown() { - producers.foreach(_.close()) - consumers.foreach(_.close()) - super.tearDown() - } - - @Test - def testSimpleConsumption() { - val numRecords = 10000 - sendRecords(numRecords) - assertEquals(0, this.consumers(0).assignment.size) - this.consumers(0).assign(List(tp)) - assertEquals(1, this.consumers(0).assignment.size) - this.consumers(0).seek(tp, 0) - consumeRecords(this.consumers(0), numRecords = numRecords, startingOffset = 0) - } - - @Test - def testAutoOffsetReset() { - sendRecords(1) - this.consumers(0).assign(List(tp)) - consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) - } - - @Test - def testSeek() { - val consumer = this.consumers(0) - val totalRecords = 50L - sendRecords(totalRecords.toInt) - consumer.assign(List(tp)) - - consumer.seekToEnd(tp) - assertEquals(totalRecords, consumer.position(tp)) - assertFalse(consumer.poll(totalRecords).iterator().hasNext) - - consumer.seekToBeginning(tp) - assertEquals(0, consumer.position(tp), 0) - consumeRecords(consumer, numRecords = 1, startingOffset = 0) - - val mid = totalRecords / 2 - consumer.seek(tp, mid) - assertEquals(mid, consumer.position(tp)) - consumeRecords(consumer, numRecords = 1, startingOffset = mid.toInt) - } - - @Test - def testGroupConsumption() { - sendRecords(10) - this.consumers(0).subscribe(List(topic)) - consumeRecords(this.consumers(0), numRecords = 1, startingOffset = 0) - } - - @Test - def testPositionAndCommit() { - sendRecords(5) - - // committed() on a partition with no committed offset returns null - assertNull(this.consumers(0).committed(new TopicPartition(topic, 15))) - - // position() on a partition that we aren't subscribed to throws an exception - intercept[IllegalArgumentException] { - this.consumers(0).position(new TopicPartition(topic, 15)) - } - - this.consumers(0).assign(List(tp)) - - assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, this.consumers(0).position(tp)) - this.consumers(0).commitSync() - assertEquals(0L, this.consumers(0).committed(tp).offset) - - consumeRecords(this.consumers(0), 5, 0) - assertEquals("After consuming 5 records, position should be 5", 5L, this.consumers(0).position(tp)) - this.consumers(0).commitSync() - assertEquals("Committed offset should be returned", 5L, this.consumers(0).committed(tp).offset) - - sendRecords(1) - - // another consumer in the same group should get the same position - this.consumers(1).assign(List(tp)) - consumeRecords(this.consumers(1), 1, 5) - } - - @Test - def testPartitionsFor() { - val numParts = 2 - TestUtils.createTopic(zkUtils, "part-test", numParts, 1, this.servers) - val parts = this.consumers(0).partitionsFor("part-test") - assertNotNull(parts) - assertEquals(2, parts.length) - assertNull(this.consumers(0).partitionsFor("non-exist-topic")) - } - - private def sendRecords(numRecords: Int) { - val futures = (0 until numRecords).map { i => - this.producers(0).send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) - } - futures.map(_.get) - } - - private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int, startingOffset: Int) { - val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() - val maxIters = numRecords * 300 - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50)) { - records.add(record) - } - if (iters > maxIters) - throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 - } - for (i <- 0 until numRecords) { - val record = records.get(i) - val offset = startingOffset + i - assertEquals(topic, record.topic()) - assertEquals(part, record.partition()) - assertEquals(offset.toLong, record.offset()) - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala deleted file mode 100644 index c22e57a..0000000 --- a/core/src/test/scala/integration/kafka/api/SSLProducerSendTest.scala +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import java.util.Properties -import java.util.concurrent.TimeUnit -import java.io.File - -import kafka.consumer.SimpleConsumer -import kafka.integration.KafkaServerTestHarness -import kafka.message.Message -import kafka.server.KafkaConfig -import kafka.utils.TestUtils -import org.apache.kafka.clients.producer._ -import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.errors.SerializationException -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.junit.Assert._ -import org.junit.{After, Before, Test} - - -class SSLProducerSendTest extends KafkaServerTestHarness { - val numServers = 2 - val trustStoreFile = File.createTempFile("truststore", ".jks") - - val overridingProps = new Properties() - overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) - - def generateConfigs() = - TestUtils.createBrokerConfigs(numServers, zkConnect, false, enableSSL=true, trustStoreFile=Some(trustStoreFile)).map(KafkaConfig.fromProps(_, overridingProps)) - - private var consumer1: SimpleConsumer = null - private var consumer2: SimpleConsumer = null - - private val topic = "topic" - private val numRecords = 100 - - @Before - override def setUp() { - super.setUp() - - // TODO: we need to migrate to new consumers when 0.9 is final - consumer1 = new SimpleConsumer("localhost", servers(0).boundPort(), 100, 1024*1024, "") - consumer2 = new SimpleConsumer("localhost", servers(1).boundPort(), 100, 1024*1024, "") - - } - - @After - override def tearDown() { - consumer1.close() - consumer2.close() - super.tearDown() - } - - /** - * testSendOffset checks the basic send API behavior - * - * 1. Send with null key/value/partition-id should be accepted; send with null topic should be rejected. - * 2. Last message of the non-blocking send should return the correct offset metadata - */ - @Test - def testSendOffset() { - var sslProducer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile)) - var producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers)) - val partition = new Integer(0) - - object callback extends Callback { - var offset = 0L - def onCompletion(metadata: RecordMetadata, exception: Exception) { - if (exception == null) { - assertEquals(offset, metadata.offset()) - assertEquals(topic, metadata.topic()) - assertEquals(partition, metadata.partition()) - offset += 1 - } else { - fail("Send callback returns the following exception", exception) - } - } - } - - try { - // create topic - TestUtils.createTopic(zkUtils, topic, 1, 2, servers) - - // send a normal record - val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, "value".getBytes) - assertEquals("Should have offset 0", 0L, sslProducer.send(record0, callback).get.offset) - - // send a record with null value should be ok - val record1 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, "key".getBytes, null) - assertEquals("Should have offset 1", 1L, sslProducer.send(record1, callback).get.offset) - - // send a record with null key should be ok - val record2 = new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, "value".getBytes) - assertEquals("Should have offset 2", 2L, sslProducer.send(record2, callback).get.offset) - - // send a record with null part id should be ok - val record3 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) - assertEquals("Should have offset 3", 3L, sslProducer.send(record3, callback).get.offset) - - // send a record with null topic should fail - try { - val record4 = new ProducerRecord[Array[Byte],Array[Byte]](null, partition, "key".getBytes, "value".getBytes) - sslProducer.send(record4, callback) - fail("Should not allow sending a record without topic") - } catch { - case iae: IllegalArgumentException => // this is ok - case e: Throwable => fail("Only expecting IllegalArgumentException", e) - } - - // non-blocking send a list of records with sslProducer - for (i <- 1 to numRecords) - sslProducer.send(record0, callback) - // check that all messages have been acked via offset - assertEquals("Should have offset " + numRecords + 4L, numRecords + 4L, sslProducer.send(record0, callback).get.offset) - - //non-blocking send a list of records with plaintext producer - for (i <- 1 to numRecords) - producer.send(record0, callback) - - // check that all messages have been acked via offset - assertEquals("Should have offset " + (numRecords * 2 + 5L), numRecords * 2 + 5L, producer.send(record0, callback).get.offset) - - } finally { - if (sslProducer != null) { - sslProducer.close() - sslProducer = null - } - if (producer != null) { - producer.close() - producer = null - } - - } - } - - /** - * testClose checks the closing behavior - * - * After close() returns, all messages should be sent with correct returned offset metadata - */ - @Test - def testClose() { - var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile)) - try { - // create topic - TestUtils.createTopic(zkUtils, topic, 1, 2, servers) - - // non-blocking send a list of records - val record0 = new ProducerRecord[Array[Byte],Array[Byte]](topic, null, "key".getBytes, "value".getBytes) - for (i <- 1 to numRecords) - producer.send(record0) - val response0 = producer.send(record0) - - // close the producer - producer.close() - producer = null - - // check that all messages have been acked via offset, - // this also checks that messages with same key go to the same partition - assertTrue("The last message should be acked before producer is shutdown", response0.isDone) - assertEquals("Should have offset " + numRecords, numRecords.toLong, response0.get.offset) - - } finally { - if (producer != null) { - producer.close() - producer = null - } - } - } - - /** - * testSendToPartition checks the partitioning behavior - * - * The specified partition-id should be respected - */ - @Test - def testSendToPartition() { - var producer = TestUtils.createNewProducer(TestUtils.getSSLBrokerListStrFromServers(servers), enableSSL=true, trustStoreFile=Some(trustStoreFile)) - try { - // create topic - val leaders = TestUtils.createTopic(zkUtils, topic, 2, 2, servers) - val partition = 1 - - // make sure leaders exist - val leader1 = leaders(partition) - assertTrue("Leader for topic \"topic\" partition 1 should exist", leader1.isDefined) - - val responses = - for (i <- 1 to numRecords) - yield producer.send(new ProducerRecord[Array[Byte],Array[Byte]](topic, partition, null, ("value" + i).getBytes)) - val futures = responses.toList - futures.map(_.get) - for (future <- futures) - assertTrue("Request should have completed", future.isDone) - - // make sure all of them end up in the same partition with increasing offset values - for ((future, offset) <- futures zip (0 until numRecords)) { - assertEquals(offset.toLong, future.get.offset) - assertEquals(topic, future.get.topic) - assertEquals(partition, future.get.partition) - } - - // make sure the fetched messages also respect the partitioning and ordering - val fetchResponse1 = if (leader1.get == configs(0).brokerId) { - consumer1.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) - } else { - consumer2.fetch(new FetchRequestBuilder().addFetch(topic, partition, 0, Int.MaxValue).build()) - } - val messageSet1 = fetchResponse1.messageSet(topic, partition).iterator.toBuffer - assertEquals("Should have fetched " + numRecords + " messages", numRecords, messageSet1.size) - - // TODO: also check topic and partition after they are added in the return messageSet - for (i <- 0 to numRecords - 1) { - assertEquals(new Message(bytes = ("value" + (i + 1)).getBytes), messageSet1(i).message) - assertEquals(i.toLong, messageSet1(i).offset) - } - } finally { - if (producer != null) { - producer.close() - producer = null - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala new file mode 100644 index 0000000..e6f0c2b --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslPlaintextConsumerTest.scala @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import org.apache.kafka.common.protocol.SecurityProtocol + +class SaslPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarness { + override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala new file mode 100644 index 0000000..4f8512a --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.io.File + +import org.apache.kafka.common.protocol.SecurityProtocol + +class SaslSslConsumerTest extends BaseConsumerTest with SaslTestHarness { + override protected def securityProtocol = SecurityProtocol.SASL_SSL + override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala new file mode 100644 index 0000000..9575fda --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.io.{BufferedReader, FileWriter, BufferedWriter, File} +import javax.security.auth.login.Configuration + +import kafka.utils.TestUtils +import kafka.zk.ZooKeeperTestHarness +import org.apache.hadoop.minikdc.MiniKdc +import org.apache.kafka.common.security.JaasUtils +import org.junit.{After, Before} + +trait SaslTestHarness extends ZooKeeperTestHarness { + val workDir = new File(System.getProperty("test.dir", "target")) + val kdcConf = MiniKdc.createConf() + val kdc = new MiniKdc(kdcConf, workDir) + + @Before + override def setUp() { + // Clean-up global configuration set by other tests + Configuration.setConfiguration(null) + val keytabFile = TestUtils.tempFile() + val jaasFile = TestUtils.tempFile() + + val writer = new BufferedWriter(new FileWriter(jaasFile)) + val source = io.Source.fromInputStream( + Thread.currentThread().getContextClassLoader.getResourceAsStream("kafka_jaas.conf"), "UTF-8") + if (source == null) + throw new IllegalStateException("Could not load `kaas_jaas.conf`, make sure it is in the classpath") + + for (line <- source.getLines) { + val replaced = line.replaceAll("\\$keytab-location", keytabFile.getAbsolutePath) + writer.write(replaced) + writer.newLine() + } + writer.close() + source.close() + + kdc.start() + kdc.createPrincipal(keytabFile, "client", "kafka/localhost") + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile.getAbsolutePath) + super.setUp + } + + @After + override def tearDown() { + super.tearDown + kdc.stop() + System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) + Configuration.setConfiguration(null) + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala new file mode 100644 index 0000000..1d13d88 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SslConsumerTest.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import java.io.File + +import org.apache.kafka.common.protocol.SecurityProtocol + +class SslConsumerTest extends BaseConsumerTest { + override protected def securityProtocol = SecurityProtocol.SSL + override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/integration/kafka/api/SslProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SslProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/SslProducerSendTest.scala new file mode 100644 index 0000000..4d9189c --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/SslProducerSendTest.scala @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.api + +import java.io.File + +import org.apache.kafka.common.protocol.SecurityProtocol + +class SslProducerSendTest extends BaseProducerSendTest { + override protected def securityProtocol = SecurityProtocol.SSL + override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala index 3cf4dae..05dc0bc 100644 --- a/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/BaseTopicMetadataTest.scala @@ -39,17 +39,23 @@ abstract class BaseTopicMetadataTest extends ZooKeeperTestHarness { var adHocConfigs: Seq[KafkaConfig] = null val numConfigs: Int = 4 - /* If this is `Some`, SSL will be enabled */ + // This should be defined if `securityProtocol` uses SSL (eg SSL, SASL_SSL) protected def trustStoreFile: Option[File] + protected def securityProtocol: SecurityProtocol @Before override def setUp() { super.setUp() - val props = createBrokerConfigs(numConfigs, zkConnect, enableSSL = trustStoreFile.isDefined, trustStoreFile = trustStoreFile) + val props = createBrokerConfigs(numConfigs, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile) val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps) adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases server1 = TestUtils.createServer(configs.head) - brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT)) + brokerEndPoints = Seq( + // We are using the Scala clients and they don't support SSL. Once we move to the Java ones, we should use + // `securityProtocol` instead of PLAINTEXT below + new BrokerEndPoint(server1.config.brokerId, server1.config.hostName, server1.boundPort(SecurityProtocol.PLAINTEXT)) + ) } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index bca0dcc..26b86f7 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -17,12 +17,14 @@ package kafka.integration +import java.io.File import java.util.Arrays import kafka.common.KafkaException import kafka.server._ import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.{After, Before} import scala.collection.mutable.Buffer @@ -52,13 +54,16 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { def bootstrapUrl = servers.map(s => s.config.hostName + ":" + s.boundPort()).mkString(",") + protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT + protected def trustStoreFile: Option[File] = None + @Before override def setUp() { super.setUp if(configs.size <= 0) throw new KafkaException("Must supply at least one server config.") servers = configs.map(TestUtils.createServer(_)).toBuffer - brokerList = TestUtils.getBrokerListStrFromServers(servers) + brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) } http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala index 176d251..55c12b5 100644 --- a/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PlaintextTopicMetadataTest.scala @@ -17,7 +17,10 @@ package kafka.integration +import org.apache.kafka.common.protocol.SecurityProtocol + class PlaintextTopicMetadataTest extends BaseTopicMetadataTest { + protected def securityProtocol = SecurityProtocol.PLAINTEXT protected def trustStoreFile = None } http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala new file mode 100644 index 0000000..11d6da4 --- /dev/null +++ b/core/src/test/scala/unit/kafka/integration/SaslPlaintextTopicMetadataTest.scala @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.integration + +import kafka.api.SaslTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol + +class SaslPlaintextTopicMetadataTest extends BaseTopicMetadataTest with SaslTestHarness { + protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + protected def trustStoreFile = None +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala new file mode 100644 index 0000000..ea15419 --- /dev/null +++ b/core/src/test/scala/unit/kafka/integration/SaslSslTopicMetadataTest.scala @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.integration + +import java.io.File + +import kafka.api.SaslTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol + +class SaslSslTopicMetadataTest extends BaseTopicMetadataTest with SaslTestHarness { + protected def securityProtocol = SecurityProtocol.SASL_SSL + protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala index 5ff9f35..ee73457 100644 --- a/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/integration/SslTopicMetadataTest.scala @@ -19,6 +19,9 @@ package kafka.integration import java.io.File +import org.apache.kafka.common.protocol.SecurityProtocol + class SslTopicMetadataTest extends BaseTopicMetadataTest { - protected def trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + protected def securityProtocol = SecurityProtocol.SSL + protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) } http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6f07a7a..b0cb97e 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -221,7 +221,8 @@ class SocketServerTest extends JUnitSuite { @Test def testSSLSocketServer(): Unit = { val trustStoreFile = File.createTempFile("truststore", ".jks") - val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0, enableSSL = true, trustStoreFile = Some(trustStoreFile)) + val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, enableSsl = true, + trustStoreFile = Some(trustStoreFile)) overrideProps.put("listeners", "SSL://localhost:0") val serverMetrics = new Metrics http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala index ade110d..5ecc2c0 100644 --- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala @@ -19,6 +19,7 @@ package kafka.server import java.io.File +import org.apache.kafka.common.protocol.SecurityProtocol import org.junit.{Test, After, Before} import kafka.zk.ZooKeeperTestHarness import kafka.utils.TestUtils._ @@ -32,15 +33,16 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness { val topic1 = "foo" val topic2 = "bar" - /* If this is `Some`, SSL will be enabled */ + // This should be defined if `securityProtocol` uses SSL (eg SSL, SASL_SSL) protected def trustStoreFile: Option[File] + protected def securityProtocol: SecurityProtocol @Before override def setUp() { super.setUp() - brokers = createBrokerConfigs(2, zkConnect, enableControlledShutdown = false, enableSSL = trustStoreFile.isDefined, trustStoreFile = trustStoreFile) - .map(KafkaConfig.fromProps) - .map(TestUtils.createServer(_)) + val props = createBrokerConfigs(2, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile) + brokers = props.map(KafkaConfig.fromProps).map(TestUtils.createServer(_)) } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 42c1199..c9f2540 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -507,6 +507,14 @@ class KafkaConfigTest { case KafkaConfig.SSLClientAuthProp => // ignore string case KafkaConfig.SSLCipherSuitesProp => // ignore string + //Sasl Configs + case KafkaConfig.SaslKerberosServiceNameProp => // ignore string + case KafkaConfig.SaslKerberosKinitCmdProp => + case KafkaConfig.SaslKerberosTicketRenewWindowFactorProp => + case KafkaConfig.SaslKerberosTicketRenewJitterProp => + case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp => + case KafkaConfig.AuthToLocalProp => // ignore string + case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") } }) http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala index 871e49b..b160481 100644 --- a/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/PlaintextReplicaFetchTest.scala @@ -17,6 +17,9 @@ package kafka.server +import org.apache.kafka.common.protocol.SecurityProtocol + class PlaintextReplicaFetchTest extends BaseReplicaFetchTest { + protected def securityProtocol = SecurityProtocol.PLAINTEXT protected def trustStoreFile = None } http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala new file mode 100644 index 0000000..740db37 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/SaslPlaintextReplicaFetchTest.scala @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.SaslTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol + +class SaslPlaintextReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness { + protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT + protected def trustStoreFile = None +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala new file mode 100644 index 0000000..1bcf8ac --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/SaslSslReplicaFetchTest.scala @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.io.File + +import kafka.api.SaslTestHarness +import org.apache.kafka.common.protocol.SecurityProtocol + +class SaslSslReplicaFetchTest extends BaseReplicaFetchTest with SaslTestHarness { + protected def securityProtocol = SecurityProtocol.SASL_SSL + protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala index 9858052..dad2285 100644 --- a/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/SslReplicaFetchTest.scala @@ -19,6 +19,9 @@ package kafka.server import java.io.File +import org.apache.kafka.common.protocol.SecurityProtocol + class SslReplicaFetchTest extends BaseReplicaFetchTest { - protected def trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + protected def securityProtocol = SecurityProtocol.SSL + protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) } http://git-wip-us.apache.org/repos/asf/kafka/blob/403158b5/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 1a0a7dc..46c88a3 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -30,7 +30,7 @@ import kafka.security.auth.{Resource, Authorizer, Acl} import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.utils.Utils._ -import collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.I0Itec.zkclient.{ZkClient, ZkConnection} @@ -52,6 +52,7 @@ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.common.network.Mode import org.apache.kafka.common.security.ssl.SSLFactory import org.apache.kafka.common.config.SSLConfigs import org.apache.kafka.test.TestSSLUtils @@ -137,37 +138,66 @@ object TestUtils extends Logging { } /** - * Create a test config for the given node id + * Create a test config for the provided parameters. + * + * Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled. */ def createBrokerConfigs(numConfigs: Int, zkConnect: String, enableControlledShutdown: Boolean = true, enableDeleteTopic: Boolean = false, - enableSSL: Boolean = false, - trustStoreFile: Option[File] = None): Seq[Properties] = { - (0 until numConfigs).map(node => createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, enableSSL = enableSSL, trustStoreFile = trustStoreFile)) - } - - def getBrokerListStrFromServers(servers: Seq[KafkaServer]): String = { - servers.map(s => formatAddress(s.config.hostName, s.boundPort())).mkString(",") + interBrokerSecurityProtocol: Option[SecurityProtocol] = None, + trustStoreFile: Option[File] = None, + enablePlaintext: Boolean = true, + enableSsl: Boolean = false, + enableSaslPlaintext: Boolean = false, + enableSaslSsl: Boolean = false): Seq[Properties] = { + (0 until numConfigs).map { node => + createBrokerConfig(node, zkConnect, enableControlledShutdown, enableDeleteTopic, RandomPort, + interBrokerSecurityProtocol, trustStoreFile, enablePlaintext = enablePlaintext, enableSsl = enableSsl, + enableSaslPlaintext = enableSaslPlaintext, enableSaslSsl = enableSaslSsl) + } } - def getSSLBrokerListStrFromServers(servers: Seq[KafkaServer]): String = { - servers.map(s => formatAddress(s.config.hostName, s.boundPort(SecurityProtocol.SSL))).mkString(",") + def getBrokerListStrFromServers(servers: Seq[KafkaServer], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): String = { + servers.map(s => formatAddress(s.config.hostName, s.boundPort(protocol))).mkString(",") } /** - * Create a test config for the given node id - */ + * Create a test config for the provided parameters. + * + * Note that if `interBrokerSecurityProtocol` is defined, the listener for the `SecurityProtocol` will be enabled. + */ def createBrokerConfig(nodeId: Int, zkConnect: String, enableControlledShutdown: Boolean = true, enableDeleteTopic: Boolean = false, - port: Int = RandomPort, enableSSL: Boolean = false, sslPort: Int = RandomPort, trustStoreFile: Option[File] = None): Properties = { + port: Int = RandomPort, + interBrokerSecurityProtocol: Option[SecurityProtocol] = None, + trustStoreFile: Option[File] = None, + enablePlaintext: Boolean = true, + enableSaslPlaintext: Boolean = false, saslPlaintextPort: Int = RandomPort, + enableSsl: Boolean = false, sslPort: Int = RandomPort, + enableSaslSsl: Boolean = false, saslSslPort: Int = RandomPort) + : Properties = { + + def shouldEnable(protocol: SecurityProtocol) = interBrokerSecurityProtocol.fold(false)(_ == protocol) + + val protocolAndPorts = ArrayBuffer[(SecurityProtocol, Int)]() + if (enablePlaintext || shouldEnable(SecurityProtocol.PLAINTEXT)) + protocolAndPorts += SecurityProtocol.PLAINTEXT -> port + if (enableSsl || shouldEnable(SecurityProtocol.SSL)) + protocolAndPorts += SecurityProtocol.SSL -> sslPort + if (enableSaslPlaintext || shouldEnable(SecurityProtocol.SASL_PLAINTEXT)) + protocolAndPorts += SecurityProtocol.SASL_PLAINTEXT -> saslPlaintextPort + if (enableSaslSsl || shouldEnable(SecurityProtocol.SASL_SSL)) + protocolAndPorts += SecurityProtocol.SASL_SSL -> saslSslPort + + val listeners = protocolAndPorts.map { case (protocol, port) => + s"${protocol.name}://localhost:$port" + }.mkString(",") + val props = new Properties - var listeners: String = "PLAINTEXT://localhost:"+port.toString if (nodeId >= 0) props.put("broker.id", nodeId.toString) - if (enableSSL) - listeners = listeners + "," + "SSL://localhost:"+sslPort.toString props.put("listeners", listeners) props.put("log.dir", TestUtils.tempDir().getAbsolutePath) props.put("zookeeper.connect", zkConnect) @@ -176,9 +206,14 @@ object TestUtils extends Logging { props.put("controlled.shutdown.enable", enableControlledShutdown.toString) props.put("delete.topic.enable", enableDeleteTopic.toString) props.put("controlled.shutdown.retry.backoff.ms", "100") - if (enableSSL) { - props.putAll(addSSLConfigs(SSLFactory.Mode.SERVER, true, trustStoreFile, "server"+nodeId)) + + if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) }) + props.putAll(sslConfigs(Mode.SERVER, true, trustStoreFile, s"server$nodeId")) + + interBrokerSecurityProtocol.foreach { protocol => + props.put(KafkaConfig.InterBrokerSecurityProtocolProp, protocol.name) } + props.put("port", port.toString) props } @@ -404,28 +439,41 @@ object TestUtils extends Logging { bufferSize: Long = 1024L * 1024L, retries: Int = 0, lingerMs: Long = 0, - enableSSL: Boolean = false, - trustStoreFile: Option[File] = None) : KafkaProducer[Array[Byte],Array[Byte]] = { + securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, + trustStoreFile: Option[File] = None, + props: Option[Properties] = None) : KafkaProducer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.producer.ProducerConfig - val producerProps = new Properties() + val producerProps = props.getOrElse(new Properties) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) producerProps.put(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, metadataFetchTimeout.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) - producerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100") - producerProps.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, "200") - producerProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs.toString) - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") - if (enableSSL) { - producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL") - producerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "producer")) + + /* Only use these if not already set */ + val defaultProps = Map( + ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100", + ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200", + ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer" + ) + defaultProps.foreach { case (key, value) => + if (!producerProps.containsKey(key)) producerProps.put(key, value) } + + if (usesSslTransportLayer(securityProtocol)) + producerProps.putAll(sslConfigs(Mode.CLIENT, false, trustStoreFile, "producer")) + producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name) new KafkaProducer[Array[Byte],Array[Byte]](producerProps) } + private def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match { + case SecurityProtocol.SSL | SecurityProtocol.SASL_SSL => true + case _ => false + } + /** * Create a new consumer with a few pre-configured properties. */ @@ -435,7 +483,7 @@ object TestUtils extends Logging { partitionFetchSize: Long = 4096L, partitionAssignmentStrategy: String = "blah", sessionTimeout: Int = 30000, - enableSSL: Boolean = false, + securityProtocol: SecurityProtocol, trustStoreFile: Option[File] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = { import org.apache.kafka.clients.consumer.ConsumerConfig @@ -450,10 +498,9 @@ object TestUtils extends Logging { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer") consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partitionAssignmentStrategy) consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout.toString) - if (enableSSL) { - consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL") - consumerProps.putAll(addSSLConfigs(SSLFactory.Mode.CLIENT, false, trustStoreFile, "consumer")) - } + if (usesSslTransportLayer(securityProtocol)) + consumerProps.putAll(sslConfigs(Mode.CLIENT, false, trustStoreFile, "consumer")) + consumerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name) new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps) } @@ -910,19 +957,18 @@ object TestUtils extends Logging { new String(bytes, encoding) } - def addSSLConfigs(mode: SSLFactory.Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias: String): Properties = { - if (!trustStoreFile.isDefined) { - throw new Exception("enableSSL set to true but no trustStoreFile provided") + def sslConfigs(mode: Mode, clientCert: Boolean, trustStoreFile: Option[File], certAlias: String): Properties = { + + val trustStore = trustStoreFile.getOrElse { + throw new Exception("SSL enabled but no trustStoreFile provided") } + val sslConfigs = { - if (mode == SSLFactory.Mode.SERVER) { - val sslConfigs = TestSSLUtils.createSSLConfig(true, true, mode, trustStoreFile.get, certAlias) - sslConfigs.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.SSL.name) - sslConfigs - } + if (mode == Mode.SERVER) + TestSSLUtils.createSSLConfig(true, true, mode, trustStore, certAlias) else - TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStoreFile.get, certAlias) + TestSSLUtils.createSSLConfig(clientCert, false, mode, trustStore, certAlias) } val sslProps = new Properties()
