Repository: storm
Updated Branches:
  refs/heads/master e4f05c219 -> d6b69a7ca


STORM-2353: Replace kafka-unit by kafka_2.11 and kafka-clients to test 
kafka-clients:0.10.1.1


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/94738460
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/94738460
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/94738460

Branch: refs/heads/master
Commit: 94738460ca3f498bdac2b610af350692cfb5a525
Parents: e4f05c2
Author: lukess <luke.skywalker....@gmail.com>
Authored: Thu Feb 9 16:35:01 2017 -0800
Committer: lukess <luke.skywalker....@gmail.com>
Committed: Wed Mar 8 23:29:08 2017 -0800

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |  36 +++++-
 .../java/org/apache/storm/kafka/KafkaUnit.java  | 111 +++++++++++++++++++
 .../org/apache/storm/kafka/KafkaUnitRule.java   |  46 ++++++++
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |  26 +++--
 4 files changed, 202 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/94738460/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml 
b/external/storm-kafka-client/pom.xml
index 0fdb64d..2ff98c1 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -88,15 +88,41 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>info.batey.kafka</groupId>
-            <artifactId>kafka-unit</artifactId>
-            <version>0.6</version>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${storm.kafka.client.version}</version>
+            <classifier>test</classifier>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${storm.kafka.client.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${storm.kafka.client.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/94738460/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
new file mode 100644
index 0000000..4bf1cea
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.*;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+public class KafkaUnit {
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private ZkUtils zkUtils;
+    private KafkaProducer<String, String> producer;
+    private static final String ZK_HOST = "127.0.0.1";
+    private static final String KAFKA_HOST = "127.0.0.1";
+    private static final int KAFKA_PORT = 9092;
+
+    public KafkaUnit() {
+    }
+
+    public void setUp() throws IOException {
+        // setup ZK
+        zkServer = new EmbeddedZookeeper();
+        String zkConnect = ZK_HOST + ":" + zkServer.port();
+        ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, 
ZKStringSerializer$.MODULE$);
+        zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", 
Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", 
String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        Time mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // setup default Producer
+        createProducer();
+    }
+
+    public void tearDown() {
+        closeProducer();
+        kafkaServer.shutdown();
+        zkUtils.close();
+        zkServer.shutdown();
+    }
+
+    public void createTopic(String topicName) {
+        AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), 
RackAwareMode.Disabled$.MODULE$);
+    }
+
+    public int getKafkaPort() {
+        return KAFKA_PORT;
+    }
+
+    private void createProducer() {
+        Properties producerProps = new Properties();
+        producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + 
KAFKA_PORT);
+        producerProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
+        producer = new KafkaProducer<>(producerProps);
+    }
+
+    public void createProducer(Serializer keySerializer, Serializer 
valueSerializer) {
+        Properties producerProps = new Properties();
+        producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + 
KAFKA_PORT);
+        producer = new KafkaProducer<>(producerProps, keySerializer, 
valueSerializer);
+    }
+
+    public void sendMessage(ProducerRecord producerRecord) throws 
InterruptedException, ExecutionException, TimeoutException {
+        producer.send(producerRecord).get(10, TimeUnit.SECONDS);
+    }
+
+    private void closeProducer() {
+        producer.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/94738460/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
new file mode 100644
index 0000000..6e90c9d
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.junit.rules.ExternalResource;
+
+import java.io.IOException;
+
+
+public class KafkaUnitRule extends ExternalResource {
+
+    private final KafkaUnit kafkaUnit;
+
+    public KafkaUnitRule() {
+        this.kafkaUnit = new KafkaUnit();
+    }
+
+    @Override
+    public void before() throws IOException {
+        kafkaUnit.setUp();
+    }
+
+    @Override
+    public void after() {
+        kafkaUnit.tearDown();
+    }
+
+    public KafkaUnit getKafkaUnit() {
+        return this.kafkaUnit;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/94738460/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index fdc9734..5822125 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -19,9 +19,9 @@ package org.apache.storm.kafka.spout;
 
 import static 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
 
-import info.batey.kafka.unit.KafkaUnitRule;
-import kafka.producer.KeyedMessage;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.kafka.KafkaUnitRule;
 import 
org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
@@ -31,6 +31,8 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -54,6 +56,7 @@ import org.junit.Before;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
+
 public class SingleTopicKafkaSpoutTest {
 
     private class SpoutContext {
@@ -85,25 +88,24 @@ public class SingleTopicKafkaSpoutTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig spoutConfig = 
getKafkaSpoutConfig(kafkaUnitRule.getKafkaPort(), commitOffsetPeriodMs);
+        KafkaSpoutConfig spoutConfig = 
getKafkaSpoutConfig(kafkaUnitRule.getKafkaUnit().getKafkaPort(), 
commitOffsetPeriodMs);
         this.consumerSpy = spy(new 
KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
         this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
         this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
     }
 
-    private void populateTopicData(String topicName, int msgCount) {
+    void populateTopicData(String topicName, int msgCount) throws 
InterruptedException, ExecutionException, TimeoutException {
         kafkaUnitRule.getKafkaUnit().createTopic(topicName);
 
-        IntStream.range(0, msgCount).forEach(value -> {
-            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(
-                topicName, Integer.toString(value),
-                Integer.toString(value));
-
-            kafkaUnitRule.getKafkaUnit().sendMessages(keyedMessage);
-        });
+        for (int i = 0; i < msgCount; i++) {
+            ProducerRecord<String, String> producerRecord = new 
ProducerRecord<>(
+                    topicName, Integer.toString(i),
+                    Integer.toString(i));
+            kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
+        }
     }
 
-    private void initializeSpout(int msgCount) {
+    private void initializeSpout(int msgCount) throws InterruptedException, 
ExecutionException, TimeoutException {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
         spout.open(conf, topologyContext, collector);
         spout.activate();

Reply via email to