KAFKA-3612: Added structure for integration tests Author: Eno Thereska <[email protected]>
Reviewers: Ismael Juma, Damian Guy, Michael G. Noll, Guozhang Wang Closes #1260 from enothereska/KAFKA-3612-integration-tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94aee214 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94aee214 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94aee214 Branch: refs/heads/0.10.0 Commit: 94aee2143ed5290d27cdd4072c6ae9bb70a6ba30 Parents: 8407dac Author: Eno Thereska <[email protected]> Authored: Wed Apr 27 16:55:51 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Apr 27 16:55:51 2016 -0700 ---------------------------------------------------------------------- build.gradle | 2 + checkstyle/import-control.xml | 11 ++ .../InternalTopicIntegrationTest.java | 169 +++++++++++++++++ .../utils/EmbeddedSingleNodeKafkaCluster.java | 128 +++++++++++++ .../integration/utils/IntegrationTestUtils.java | 157 +++++++++++++++ .../integration/utils/KafkaEmbedded.java | 189 +++++++++++++++++++ 6 files changed, 656 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index dfa7c40..06c41d5 100644 --- a/build.gradle +++ b/build.gradle @@ -669,6 +669,8 @@ project(':streams') { compile libs.jacksonDatabind // this dependency should be removed after KIP-4 testCompile project(':clients').sourceSets.test.output + testCompile project(':core') + testCompile project(':core').sourceSets.test.output testCompile libs.junit } http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index e94698c..39d4ca3 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -138,6 +138,17 @@ <allow pkg="com.fasterxml.jackson.databind" /> <allow pkg="org.apache.kafka.connect.json" /> </subpackage> + + <subpackage name="integration"> + <allow pkg="kafka.admin" /> + <allow pkg="kafka.server" /> + <allow pkg="kafka.utils" /> + <allow pkg="kafka.zk" /> + <allow pkg="kafka.log" /> + <allow pkg="scala" /> + <allow pkg="scala.collection" /> + <allow pkg="org.I0Itec.zkclient" /> + </subpackage> <subpackage name="state"> <allow pkg="org.rocksdb" /> http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java new file mode 100644 index 0000000..2a3e767 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Properties; + +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import kafka.admin.AdminUtils; +import kafka.log.LogConfig; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import scala.Tuple2; +import scala.collection.Iterator; +import scala.collection.Map; + +/** + * Tests related to internal topics in streams + */ +public class InternalTopicIntegrationTest { + @ClassRule + public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster(); + private static final String DEFAULT_INPUT_TOPIC = "inputTopic"; + private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; + private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; + private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; + + @BeforeClass + public static void startKafkaCluster() throws Exception { + cluster.createTopic(DEFAULT_INPUT_TOPIC); + cluster.createTopic(DEFAULT_OUTPUT_TOPIC); + } + + /** + * Validates that any state changelog topics are compacted + * @return true if topics have a valid config, false otherwise + */ + private boolean isUsingCompactionForStateChangelogTopics() { + boolean valid = true; + + // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then + // createTopic() will only seem to work (it will return without error). The topic will exist in + // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the + // topic. + ZkClient zkClient = new ZkClient( + cluster.zKConnectString(), + DEFAULT_ZK_SESSION_TIMEOUT_MS, + DEFAULT_ZK_CONNECTION_TIMEOUT_MS, + ZKStringSerializer$.MODULE$); + boolean isSecure = false; + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(cluster.zKConnectString()), isSecure); + + Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils); + Iterator it = topicConfigs.iterator(); + while (it.hasNext()) { + Tuple2<String, Properties> topicConfig = (Tuple2<String, Properties>) it.next(); + String topic = topicConfig._1; + Properties prop = topicConfig._2; + + // state changelogs should be compacted + if (topic.endsWith(ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX)) { + if (!prop.containsKey(LogConfig.CleanupPolicyProp()) || + !prop.getProperty(LogConfig.CleanupPolicyProp()).equals(LogConfig.Compact())) { + valid = false; + break; + } + } + } + zkClient.close(); + return valid; + } + + @Test + public void shouldCompactTopicsForStateChangelogs() throws Exception { + List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world"); + + // + // Step 1: Configure and start a simple word count topology + // + final Serde<String> stringSerde = Serdes.String(); + final Serde<Long> longSerde = Serdes.Long(); + + Properties streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test"); + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString()); + streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams"); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC); + + KStream<String, Long> wordCounts = textLines + .flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); + } + }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { + @Override + public KeyValue<String, String> apply(String key, String value) { + return new KeyValue<String, String>(value, value); + } + }).countByKey("Counts").toStream(); + + wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); + + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + + KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + // + // Step 2: Produce some input data to the input topic. + // + Properties producerConfig = new Properties(); + producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0); + producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig); + + // + // Step 3: Verify the state changelog topics are compact + // + streams.close(); + assertEquals(isUsingCompactionForStateChangelogTopics(), true); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java new file mode 100644 index 0000000..34753ae --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration.utils; + +import kafka.server.KafkaConfig$; +import kafka.zk.EmbeddedZookeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; +import org.junit.rules.ExternalResource; + +/** + * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker. + */ +public class EmbeddedSingleNodeKafkaCluster extends ExternalResource { + + private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class); + private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected + private EmbeddedZookeeper zookeeper = null; + private KafkaEmbedded broker = null; + + /** + * Creates and starts a Kafka cluster. + */ + public void start() throws IOException, InterruptedException { + Properties brokerConfig = new Properties(); + + log.debug("Initiating embedded Kafka cluster startup"); + log.debug("Starting a ZooKeeper instance"); + zookeeper = new EmbeddedZookeeper(); + log.debug("ZooKeeper instance is running at {}", zKConnectString()); + brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString()); + brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT); + + log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp())); + broker = new KafkaEmbedded(brokerConfig); + + log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}", + broker.brokerList(), broker.zookeeperConnect()); + } + + /** + * Stop the Kafka cluster. + */ + public void stop() { + broker.stop(); + zookeeper.shutdown(); + } + + /** + * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format. + * Example: `127.0.0.1:2181`. + * + * You can use this to e.g. tell Kafka brokers how to connect to this instance. + */ + public String zKConnectString() { + return "localhost:" + zookeeper.port(); + } + + /** + * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`. + * + * You can use this to tell Kafka producers how to connect to this cluster. + */ + public String bootstrapServers() { + return broker.brokerList(); + } + + protected void before() throws Throwable { + start(); + } + + protected void after() { + stop(); + } + + /** + * Create a Kafka topic with 1 partition and a replication factor of 1. + * + * @param topic The name of the topic. + */ + public void createTopic(String topic) { + createTopic(topic, 1, 1, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (the partitions of) this topic. + */ + public void createTopic(String topic, int partitions, int replication) { + createTopic(topic, partitions, replication, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (partitions of) this topic. + * @param topicConfig Additional topic-level configuration settings. + */ + public void createTopic(String topic, + int partitions, + int replication, + Properties topicConfig) { + broker.createTopic(topic, partitions, replication, topicConfig); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java new file mode 100644 index 0000000..89fe0c4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration.utils; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Utility functions to make integration testing more convenient. + */ +public class IntegrationTestUtils { + + private static final int UNLIMITED_MESSAGES = -1; + + /** + * Returns up to `maxMessages` message-values from the topic. + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @param maxMessages Maximum number of messages to read via the consumer. + * @return The values retrieved via the consumer. + */ + public static <K, V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) { + List<V> returnList = new ArrayList<>(); + List<KeyValue<K, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages); + for (KeyValue<K, V> kv : kvs) { + returnList.add(kv.value); + } + return returnList; + } + + /** + * Returns as many messages as possible from the topic until a (currently hardcoded) timeout is + * reached. + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @return The KeyValue elements retrieved via the consumer. + */ + public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig) { + return readKeyValues(topic, consumerConfig, UNLIMITED_MESSAGES); + } + + /** + * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from + * are already configured in the consumer). + * + * @param topic Kafka topic to read messages from + * @param consumerConfig Kafka consumer configuration + * @param maxMessages Maximum number of messages to read via the consumer + * @return The KeyValue elements retrieved via the consumer + */ + public static <K, V> List<KeyValue<K, V>> readKeyValues(String topic, Properties consumerConfig, int maxMessages) { + KafkaConsumer<K, V> consumer = new KafkaConsumer<>(consumerConfig); + consumer.subscribe(Collections.singletonList(topic)); + int pollIntervalMs = 100; + int maxTotalPollTimeMs = 2000; + int totalPollTimeMs = 0; + List<KeyValue<K, V>> consumedValues = new ArrayList<>(); + while (totalPollTimeMs < maxTotalPollTimeMs && continueConsuming(consumedValues.size(), maxMessages)) { + totalPollTimeMs += pollIntervalMs; + ConsumerRecords<K, V> records = consumer.poll(pollIntervalMs); + for (ConsumerRecord<K, V> record : records) { + consumedValues.add(new KeyValue<>(record.key(), record.value())); + } + } + consumer.close(); + return consumedValues; + } + + private static boolean continueConsuming(int messagesConsumed, int maxMessages) { + return maxMessages <= 0 || messagesConsumed < maxMessages; + } + + /** + * Removes local state stores. Useful to reset state in-between integration test runs. + * + * @param streamsConfiguration Streams configuration settings + */ + public static void purgeLocalStreamsState(Properties streamsConfiguration) throws IOException { + String path = streamsConfiguration.getProperty(StreamsConfig.STATE_DIR_CONFIG); + if (path != null) { + File node = Paths.get(path).normalize().toFile(); + // Only purge state when it's under /tmp. This is a safety net to prevent accidentally + // deleting important local directory trees. + if (node.getAbsolutePath().startsWith("/tmp")) { + Utils.delete(new File(node.getAbsolutePath())); + } + } + } + + /** + * @param topic Kafka topic to write the data records to + * @param records Data records to write to Kafka + * @param producerConfig Kafka producer configuration + * @param <K> Key type of the data records + * @param <V> Value type of the data records + */ + public static <K, V> void produceKeyValuesSynchronously( + String topic, Collection<KeyValue<K, V>> records, Properties producerConfig) + throws ExecutionException, InterruptedException { + Producer<K, V> producer = new KafkaProducer<>(producerConfig); + for (KeyValue<K, V> record : records) { + Future<RecordMetadata> f = producer.send( + new ProducerRecord<>(topic, record.key, record.value)); + f.get(); + } + producer.flush(); + producer.close(); + } + + public static <V> void produceValuesSynchronously( + String topic, Collection<V> records, Properties producerConfig) + throws ExecutionException, InterruptedException { + Collection<KeyValue<Object, V>> keyedRecords = new ArrayList<>(); + for (V value : records) { + KeyValue<Object, V> kv = new KeyValue<>(null, value); + keyedRecords.add(kv); + } + produceKeyValuesSynchronously(topic, keyedRecords, producerConfig); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/94aee214/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java new file mode 100644 index 0000000..348b46b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration.utils; + + +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; + +import java.io.File; +import java.util.Collections; +import java.util.List; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.KafkaServer; +import kafka.utils.CoreUtils; +import kafka.utils.SystemTime$; +import kafka.utils.TestUtils; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; +import org.junit.rules.TemporaryFolder; +/** + * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092` by + * default. + * + * Requires a running ZooKeeper instance to connect to. + */ +public class KafkaEmbedded { + + private static final Logger log = LoggerFactory.getLogger(KafkaEmbedded.class); + + private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181"; + private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000; + private static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 8 * 1000; + private final Properties effectiveConfig; + private final File logDir; + public final TemporaryFolder tmpFolder; + private final KafkaServer kafka; + + /** + * Creates and starts an embedded Kafka broker. + * @param config Broker configuration settings. Used to modify, for example, on which port the + * broker should listen to. Note that you cannot change the `log.dirs` setting + * currently. + */ + public KafkaEmbedded(Properties config) throws IOException { + tmpFolder = new TemporaryFolder(); + tmpFolder.create(); + logDir = tmpFolder.newFolder(); + effectiveConfig = effectiveConfigFrom(config); + boolean loggingEnabled = true; + KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig, loggingEnabled); + log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK ensemble at {}) ...", + logDir, zookeeperConnect()); + kafka = TestUtils.createServer(kafkaConfig, SystemTime$.MODULE$); + log.debug("Startup of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + } + + + /** + * Creates the configuration for starting the Kafka broker by merging default values with + * overwrites. + * @param initialConfig Broker configuration settings that override the default config. + * @return + * @throws IOException + */ + private Properties effectiveConfigFrom(Properties initialConfig) throws IOException { + Properties effectiveConfig = new Properties(); + effectiveConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), 0); + effectiveConfig.put(KafkaConfig$.MODULE$.HostNameProp(), "127.0.0.1"); + effectiveConfig.put(KafkaConfig$.MODULE$.PortProp(), "9092"); + effectiveConfig.put(KafkaConfig$.MODULE$.NumPartitionsProp(), 1); + effectiveConfig.put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), true); + effectiveConfig.put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), 1000000); + effectiveConfig.put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), true); + + effectiveConfig.putAll(initialConfig); + effectiveConfig.setProperty(KafkaConfig$.MODULE$.LogDirProp(), logDir.getAbsolutePath()); + return effectiveConfig; + } + + /** + * This broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`. + * + * You can use this to tell Kafka producers and consumers how to connect to this instance. + */ + public String brokerList() { + return kafka.config().hostName() + ":" + kafka.boundPort(SecurityProtocol.PLAINTEXT); + } + + + /** + * The ZooKeeper connection string aka `zookeeper.connect`. + */ + public String zookeeperConnect() { + return effectiveConfig.getProperty("zookeeper.connect", DEFAULT_ZK_CONNECT); + } + + /** + * Stop the broker. + */ + public void stop() { + log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + kafka.shutdown(); + kafka.awaitShutdown(); + log.debug("Removing logs.dir at {} ...", logDir); + List<String> logDirs = Collections.singletonList(logDir.getAbsolutePath()); + CoreUtils.delete(scala.collection.JavaConversions.asScalaBuffer(logDirs).seq()); + tmpFolder.delete(); + log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK ensemble at {}) ...", + brokerList(), zookeeperConnect()); + } + + /** + * Create a Kafka topic with 1 partition and a replication factor of 1. + * + * @param topic The name of the topic. + */ + public void createTopic(String topic) { + createTopic(topic, 1, 1, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (the partitions of) this topic. + */ + public void createTopic(String topic, int partitions, int replication) { + createTopic(topic, partitions, replication, new Properties()); + } + + /** + * Create a Kafka topic with the given parameters. + * + * @param topic The name of the topic. + * @param partitions The number of partitions for this topic. + * @param replication The replication factor for (partitions of) this topic. + * @param topicConfig Additional topic-level configuration settings. + */ + public void createTopic(String topic, + int partitions, + int replication, + Properties topicConfig) { + log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }", + topic, partitions, replication, topicConfig); + + // Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then + // createTopic() will only seem to work (it will return without error). The topic will exist in + // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the + // topic. + ZkClient zkClient = new ZkClient( + zookeeperConnect(), + DEFAULT_ZK_SESSION_TIMEOUT_MS, + DEFAULT_ZK_CONNECTION_TIMEOUT_MS, + ZKStringSerializer$.MODULE$); + boolean isSecure = false; + ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect()), isSecure); + AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$); + zkClient.close(); + } +} \ No newline at end of file
