Repository: storm
Updated Branches:
  refs/heads/master 8c17b74f4 -> 3aed9165d


STORM-1091: Add tick tuple unit tests for hdfs and hive bolts


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8508669
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8508669
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8508669

Branch: refs/heads/master
Commit: c85086693519a87436a0c96c361d0f5e640bc3b5
Parents: 4c2b37c
Author: Aaron Dossett <aaron.doss...@target.com>
Authored: Tue Oct 6 14:55:14 2015 -0500
Committer: Aaron Dossett <aaron.doss...@target.com>
Committed: Tue Oct 6 14:55:14 2015 -0500

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |  7 +++
 .../bolt/IntermediateRankingsBoltTest.java      |  2 +-
 .../starter/bolt/RollingCountBoltTest.java      |  2 +-
 .../starter/bolt/TotalRankingsBoltTest.java     |  2 +-
 .../storm/starter/tools/MockTupleHelpers.java   | 40 --------------
 external/storm-hdfs/pom.xml                     | 15 ++++++
 .../apache/storm/hdfs/bolt/TestHdfsBolt.java    | 18 ++++++-
 external/storm-hive/pom.xml                     |  7 +++
 .../apache/storm/hive/bolt/TestHiveBolt.java    | 56 +++++++++++++++++++-
 storm-core/pom.xml                              | 12 +++++
 .../backtype/storm/utils/MockTupleHelpers.java  | 40 ++++++++++++++
 11 files changed, 155 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index c889694..0f82c33 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -74,6 +74,13 @@
     </dependency>
       <dependency>
           <groupId>org.apache.storm</groupId>
+          <artifactId>storm-core</artifactId>
+          <version>${project.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
           <artifactId>multilang-javascript</artifactId>
           <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
 
b/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
index c296a89..278a513 100644
--- 
a/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
+++ 
b/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java
@@ -23,10 +23,10 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.MockTupleHelpers;
 import com.google.common.collect.Lists;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import storm.starter.tools.MockTupleHelpers;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java 
b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
index bc31ba0..ecb1216 100644
--- 
a/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
+++ 
b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java
@@ -24,8 +24,8 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.MockTupleHelpers;
 import org.testng.annotations.Test;
-import storm.starter.tools.MockTupleHelpers;
 
 import java.util.Map;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java 
b/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
index 49e3d67..a6af931 100644
--- 
a/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
+++ 
b/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java
@@ -23,9 +23,9 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.MockTupleHelpers;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
-import storm.starter.tools.MockTupleHelpers;
 import storm.starter.tools.Rankings;
 
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git 
a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java 
b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
deleted file mode 100644
index b253350..0000000
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.tools;
-
-import backtype.storm.Constants;
-import backtype.storm.tuple.Tuple;
-
-import static org.mockito.Mockito.*;
-
-public final class MockTupleHelpers {
-
-  private MockTupleHelpers() {
-  }
-
-  public static Tuple mockTickTuple() {
-    return mockTuple(Constants.SYSTEM_COMPONENT_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
-  }
-
-  public static Tuple mockTuple(String componentId, String streamId) {
-    Tuple tuple = mock(Tuple.class);
-    when(tuple.getSourceComponent()).thenReturn(componentId);
-    when(tuple.getSourceStreamId()).thenReturn(streamId);
-    return tuple;
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/external/storm-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index 7a3e954..1765be9 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -51,6 +51,21 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+            <exclusions>
+                <!--log4j-over-slf4j must be excluded for hadoop-minicluster
+                    see: http://stackoverflow.com/q/20469026/3542091 -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-client</artifactId>
             <version>${hadoop.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index 5c73ca2..2f2014c 100644
--- 
a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ 
b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -26,7 +26,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import backtype.storm.utils.MockTupleHelpers;
 import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
 import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
 import org.apache.storm.hdfs.bolt.format.FileNameFormat;
@@ -171,6 +171,22 @@ public class TestHdfsBolt {
         Assert.assertEquals(1, countZeroLengthFiles(testRoot));
     }
 
+    @Test
+    public void testTickTuples() throws IOException
+    {
+        HdfsBolt bolt = makeHdfsBolt(hdfsURI, 10, 10000f);
+        bolt.prepare(new Config(), topologyContext, collector);
+
+        bolt.execute(tuple1);
+
+        //Should not have flushed to file system yet
+        Assert.assertEquals(0, countNonZeroLengthFiles(testRoot));
+
+        bolt.execute(MockTupleHelpers.mockTickTuple());
+
+        //Tick should have flushed it
+        Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
+    }
 
     public void createBaseDirectory(FileSystem passedFs, String path) throws 
IOException {
         Path p = new Path(path);

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/external/storm-hive/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml
index 6fa4fa9..f842c25 100644
--- a/external/storm-hive/pom.xml
+++ b/external/storm-hive/pom.xml
@@ -45,6 +45,13 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.storm</groupId>
+      <artifactId>storm-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive.hcatalog</groupId>
       <artifactId>hive-hcatalog-streaming</artifactId>
       <version>${hive.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
 
b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
index 0350c6e..8e79c8c 100644
--- 
a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
+++ 
b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java
@@ -26,6 +26,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.MockTupleHelpers;
 
 import org.apache.storm.hive.common.HiveOptions;
 import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper;
@@ -45,7 +46,6 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.Spy;
 import org.mockito.MockitoAnnotations;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
@@ -248,7 +248,6 @@ public class TestHiveBolt {
         bolt = new HiveBolt(hiveOptions);
         bolt.prepare(config, null, new OutputCollector(collector));
         Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA");
-
         //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA");
         bolt.execute(tuple1);
         verify(collector).ack(tuple1);
@@ -337,6 +336,59 @@ public class TestHiveBolt {
     }
 
     @Test
+    public void testTickTuple()
+    {
+        JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+                .withColumnFields(new Fields(colNames1))
+                .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new 
HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(2)
+                .withBatchSize(2);
+
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config, null, new OutputCollector(collector));
+
+        Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA");
+        Tuple tuple2 = generateTestTuple(2, "SFO", "San Jose", "CA");
+
+
+        bolt.execute(tuple1);
+
+        //The tick should cause tuple1 to be ack'd
+        Tuple mockTick = MockTupleHelpers.mockTickTuple();
+        bolt.execute(mockTick);
+        verify(collector).ack(tuple1);
+
+        //The second tuple should NOT be ack'd because the batch should be 
cleared and this will be
+        //the first transaction in the new batch
+        bolt.execute(tuple2);
+        verify(collector, never()).ack(tuple2);
+
+        bolt.cleanup();
+    }
+
+    @Test
+    public void testNoTickEmptyBatches() throws Exception
+    {
+        JsonRecordHiveMapper mapper = new JsonRecordHiveMapper()
+                .withColumnFields(new Fields(colNames1))
+                .withPartitionFields(new Fields(partNames));
+        HiveOptions hiveOptions = new 
HiveOptions(metaStoreURI,dbName,tblName,mapper)
+                .withTxnsPerBatch(2)
+                .withBatchSize(2);
+
+        bolt = new HiveBolt(hiveOptions);
+        bolt.prepare(config, null, new OutputCollector(collector));
+
+        //The tick should NOT cause any acks since the batch was empty
+        Tuple mockTick = MockTupleHelpers.mockTickTuple();
+        bolt.execute(mockTick);
+        verifyZeroInteractions(collector);
+
+        bolt.cleanup();
+    }
+
+    @Test
     public void testMultiPartitionTuples()
         throws Exception {
         DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 7dc5c03..940ab45 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -783,6 +783,18 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.6</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
     </build>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/storm-core/test/jvm/backtype/storm/utils/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/MockTupleHelpers.java 
b/storm-core/test/jvm/backtype/storm/utils/MockTupleHelpers.java
new file mode 100644
index 0000000..a78a168
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/MockTupleHelpers.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+
+import org.mockito.Mockito;
+
+public final class MockTupleHelpers {
+
+  private MockTupleHelpers() {
+  }
+
+  public static Tuple mockTickTuple() {
+    return mockTuple(Constants.SYSTEM_COMPONENT_ID, 
Constants.SYSTEM_TICK_STREAM_ID);
+  }
+
+  public static Tuple mockTuple(String componentId, String streamId) {
+    Tuple tuple = Mockito.mock(Tuple.class);
+    Mockito.when(tuple.getSourceComponent()).thenReturn(componentId);
+    Mockito.when(tuple.getSourceStreamId()).thenReturn(streamId);
+    return tuple;
+  }
+}

Reply via email to