Repository: storm Updated Branches: refs/heads/master e4f05c219 -> d6b69a7ca
STORM-2353: Replace kafka-unit by kafka_2.11 and kafka-clients to test kafka-clients:0.10.1.1 Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/94738460 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/94738460 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/94738460 Branch: refs/heads/master Commit: 94738460ca3f498bdac2b610af350692cfb5a525 Parents: e4f05c2 Author: lukess <luke.skywalker....@gmail.com> Authored: Thu Feb 9 16:35:01 2017 -0800 Committer: lukess <luke.skywalker....@gmail.com> Committed: Wed Mar 8 23:29:08 2017 -0800 ---------------------------------------------------------------------- external/storm-kafka-client/pom.xml | 36 +++++- .../java/org/apache/storm/kafka/KafkaUnit.java | 111 +++++++++++++++++++ .../org/apache/storm/kafka/KafkaUnitRule.java | 46 ++++++++ .../kafka/spout/SingleTopicKafkaSpoutTest.java | 26 +++-- 4 files changed, 202 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/94738460/external/storm-kafka-client/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index 0fdb64d..2ff98c1 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -88,15 +88,41 @@ <scope>test</scope> </dependency> <dependency> - <groupId>info.batey.kafka</groupId> - <artifactId>kafka-unit</artifactId> - <version>0.6</version> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${storm.kafka.client.version}</version> + <classifier>test</classifier> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${storm.kafka.client.version}</version> + <classifier>test</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.11</artifactId> + <version>${storm.kafka.client.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/storm/blob/94738460/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java new file mode 100644 index 0000000..4bf1cea --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; +import kafka.utils.*; +import kafka.zk.EmbeddedZookeeper; +import org.I0Itec.zkclient.ZkClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + +public class KafkaUnit { + private KafkaServer kafkaServer; + private EmbeddedZookeeper zkServer; + private ZkUtils zkUtils; + private KafkaProducer<String, String> producer; + private static final String ZK_HOST = "127.0.0.1"; + private static final String KAFKA_HOST = "127.0.0.1"; + private static final int KAFKA_PORT = 9092; + + public KafkaUnit() { + } + + public void setUp() throws IOException { + // setup ZK + zkServer = new EmbeddedZookeeper(); + String zkConnect = ZK_HOST + ":" + zkServer.port(); + ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); + zkUtils = ZkUtils.apply(zkClient, false); + + // setup Broker + Properties brokerProps = new Properties(); + brokerProps.setProperty("zookeeper.connect", zkConnect); + brokerProps.setProperty("broker.id", "0"); + brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); + brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT)); + KafkaConfig config = new KafkaConfig(brokerProps); + Time mock = new MockTime(); + kafkaServer = TestUtils.createServer(config, mock); + + // setup default Producer + createProducer(); + } + + public void tearDown() { + closeProducer(); + kafkaServer.shutdown(); + zkUtils.close(); + zkServer.shutdown(); + } + + public void createTopic(String topicName) { + AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); + } + + public int getKafkaPort() { + return KAFKA_PORT; + } + + private void createProducer() { + Properties producerProps = new Properties(); + producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT); + producerProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producer = new KafkaProducer<>(producerProps); + } + + public void createProducer(Serializer keySerializer, Serializer valueSerializer) { + Properties producerProps = new Properties(); + producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT); + producer = new KafkaProducer<>(producerProps, keySerializer, valueSerializer); + } + + public void sendMessage(ProducerRecord producerRecord) throws InterruptedException, ExecutionException, TimeoutException { + producer.send(producerRecord).get(10, TimeUnit.SECONDS); + } + + private void closeProducer() { + producer.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/94738460/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java new file mode 100644 index 0000000..6e90c9d --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.junit.rules.ExternalResource; + +import java.io.IOException; + + +public class KafkaUnitRule extends ExternalResource { + + private final KafkaUnit kafkaUnit; + + public KafkaUnitRule() { + this.kafkaUnit = new KafkaUnit(); + } + + @Override + public void before() throws IOException { + kafkaUnit.setUp(); + } + + @Override + public void after() { + kafkaUnit.tearDown(); + } + + public KafkaUnit getKafkaUnit() { + return this.kafkaUnit; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/94738460/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java index fdc9734..5822125 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java @@ -19,9 +19,9 @@ package org.apache.storm.kafka.spout; import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig; -import info.batey.kafka.unit.KafkaUnitRule; -import kafka.producer.KeyedMessage; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.storm.kafka.KafkaUnitRule; import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; @@ -31,6 +31,8 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.stream.IntStream; import static org.hamcrest.CoreMatchers.is; @@ -54,6 +56,7 @@ import org.junit.Before; import org.mockito.Captor; import org.mockito.MockitoAnnotations; + public class SingleTopicKafkaSpoutTest { private class SpoutContext { @@ -85,25 +88,24 @@ public class SingleTopicKafkaSpoutTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs); + KafkaSpoutConfig spoutConfig = getKafkaSpoutConfig(kafkaUnitRule.getKafkaUnit().getKafkaPort(), commitOffsetPeriodMs); this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig)); this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy; this.spout = new KafkaSpout<>(spoutConfig, consumerFactory); } - private void populateTopicData(String topicName, int msgCount) { + void populateTopicData(String topicName, int msgCount) throws InterruptedException, ExecutionException, TimeoutException { kafkaUnitRule.getKafkaUnit().createTopic(topicName); - IntStream.range(0, msgCount).forEach(value -> { - KeyedMessage<String, String> keyedMessage = new KeyedMessage<>( - topicName, Integer.toString(value), - Integer.toString(value)); - - kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage); - }); + for (int i = 0; i < msgCount; i++) { + ProducerRecord<String, String> producerRecord = new ProducerRecord<>( + topicName, Integer.toString(i), + Integer.toString(i)); + kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord); + } } - private void initializeSpout(int msgCount) { + private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException { populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount); spout.open(conf, topologyContext, collector); spout.activate();