Repository: storm Updated Branches: refs/heads/master 8c17b74f4 -> 3aed9165d
STORM-1091: Add tick tuple unit tests for hdfs and hive bolts Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8508669 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8508669 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8508669 Branch: refs/heads/master Commit: c85086693519a87436a0c96c361d0f5e640bc3b5 Parents: 4c2b37c Author: Aaron Dossett <aaron.doss...@target.com> Authored: Tue Oct 6 14:55:14 2015 -0500 Committer: Aaron Dossett <aaron.doss...@target.com> Committed: Tue Oct 6 14:55:14 2015 -0500 ---------------------------------------------------------------------- examples/storm-starter/pom.xml | 7 +++ .../bolt/IntermediateRankingsBoltTest.java | 2 +- .../starter/bolt/RollingCountBoltTest.java | 2 +- .../starter/bolt/TotalRankingsBoltTest.java | 2 +- .../storm/starter/tools/MockTupleHelpers.java | 40 -------------- external/storm-hdfs/pom.xml | 15 ++++++ .../apache/storm/hdfs/bolt/TestHdfsBolt.java | 18 ++++++- external/storm-hive/pom.xml | 7 +++ .../apache/storm/hive/bolt/TestHiveBolt.java | 56 +++++++++++++++++++- storm-core/pom.xml | 12 +++++ .../backtype/storm/utils/MockTupleHelpers.java | 40 ++++++++++++++ 11 files changed, 155 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index c889694..0f82c33 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -74,6 +74,13 @@ </dependency> <dependency> <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> <artifactId>multilang-javascript</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java b/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java index c296a89..278a513 100644 --- a/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java +++ b/examples/storm-starter/test/jvm/storm/starter/bolt/IntermediateRankingsBoltTest.java @@ -23,10 +23,10 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.MockTupleHelpers; import com.google.common.collect.Lists; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import storm.starter.tools.MockTupleHelpers; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java index bc31ba0..ecb1216 100644 --- a/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java +++ b/examples/storm-starter/test/jvm/storm/starter/bolt/RollingCountBoltTest.java @@ -24,8 +24,8 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.MockTupleHelpers; import org.testng.annotations.Test; -import storm.starter.tools.MockTupleHelpers; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java b/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java index 49e3d67..a6af931 100644 --- a/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java +++ b/examples/storm-starter/test/jvm/storm/starter/bolt/TotalRankingsBoltTest.java @@ -23,9 +23,9 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.MockTupleHelpers; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import storm.starter.tools.MockTupleHelpers; import storm.starter.tools.Rankings; import java.util.Map; http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java deleted file mode 100644 index b253350..0000000 --- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.tools; - -import backtype.storm.Constants; -import backtype.storm.tuple.Tuple; - -import static org.mockito.Mockito.*; - -public final class MockTupleHelpers { - - private MockTupleHelpers() { - } - - public static Tuple mockTickTuple() { - return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID); - } - - public static Tuple mockTuple(String componentId, String streamId) { - Tuple tuple = mock(Tuple.class); - when(tuple.getSourceComponent()).thenReturn(componentId); - when(tuple.getSourceStreamId()).thenReturn(streamId); - return tuple; - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/external/storm-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml index 7a3e954..1765be9 100644 --- a/external/storm-hdfs/pom.xml +++ b/external/storm-hdfs/pom.xml @@ -51,6 +51,21 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + <exclusions> + <!--log4j-over-slf4j must be excluded for hadoop-minicluster + see: http://stackoverflow.com/q/20469026/3542091 --> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java index 5c73ca2..2f2014c 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java @@ -26,7 +26,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.TupleImpl; import backtype.storm.tuple.Values; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import backtype.storm.utils.MockTupleHelpers; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; @@ -171,6 +171,22 @@ public class TestHdfsBolt { Assert.assertEquals(1, countZeroLengthFiles(testRoot)); } + @Test + public void testTickTuples() throws IOException + { + HdfsBolt bolt = makeHdfsBolt(hdfsURI, 10, 10000f); + bolt.prepare(new Config(), topologyContext, collector); + + bolt.execute(tuple1); + + //Should not have flushed to file system yet + Assert.assertEquals(0, countNonZeroLengthFiles(testRoot)); + + bolt.execute(MockTupleHelpers.mockTickTuple()); + + //Tick should have flushed it + Assert.assertEquals(1, countNonZeroLengthFiles(testRoot)); + } public void createBaseDirectory(FileSystem passedFs, String path) throws IOException { Path p = new Path(path); http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/external/storm-hive/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-hive/pom.xml b/external/storm-hive/pom.xml index 6fa4fa9..f842c25 100644 --- a/external/storm-hive/pom.xml +++ b/external/storm-hive/pom.xml @@ -45,6 +45,13 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.hive.hcatalog</groupId> <artifactId>hive-hcatalog-streaming</artifactId> <version>${hive.version}</version> http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java index 0350c6e..8e79c8c 100644 --- a/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java +++ b/external/storm-hive/src/test/java/org/apache/storm/hive/bolt/TestHiveBolt.java @@ -26,6 +26,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.TupleImpl; import backtype.storm.tuple.Values; +import backtype.storm.utils.MockTupleHelpers; import org.apache.storm.hive.common.HiveOptions; import org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper; @@ -45,7 +46,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.Spy; import org.mockito.MockitoAnnotations; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; @@ -248,7 +248,6 @@ public class TestHiveBolt { bolt = new HiveBolt(hiveOptions); bolt.prepare(config, null, new OutputCollector(collector)); Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA"); - //Tuple tuple2 = generateTestTuple(2,"SFO","San Jose","CA"); bolt.execute(tuple1); verify(collector).ack(tuple1); @@ -337,6 +336,59 @@ public class TestHiveBolt { } @Test + public void testTickTuple() + { + JsonRecordHiveMapper mapper = new JsonRecordHiveMapper() + .withColumnFields(new Fields(colNames1)) + .withPartitionFields(new Fields(partNames)); + HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + .withTxnsPerBatch(2) + .withBatchSize(2); + + bolt = new HiveBolt(hiveOptions); + bolt.prepare(config, null, new OutputCollector(collector)); + + Tuple tuple1 = generateTestTuple(1, "SJC", "Sunnyvale", "CA"); + Tuple tuple2 = generateTestTuple(2, "SFO", "San Jose", "CA"); + + + bolt.execute(tuple1); + + //The tick should cause tuple1 to be ack'd + Tuple mockTick = MockTupleHelpers.mockTickTuple(); + bolt.execute(mockTick); + verify(collector).ack(tuple1); + + //The second tuple should NOT be ack'd because the batch should be cleared and this will be + //the first transaction in the new batch + bolt.execute(tuple2); + verify(collector, never()).ack(tuple2); + + bolt.cleanup(); + } + + @Test + public void testNoTickEmptyBatches() throws Exception + { + JsonRecordHiveMapper mapper = new JsonRecordHiveMapper() + .withColumnFields(new Fields(colNames1)) + .withPartitionFields(new Fields(partNames)); + HiveOptions hiveOptions = new HiveOptions(metaStoreURI,dbName,tblName,mapper) + .withTxnsPerBatch(2) + .withBatchSize(2); + + bolt = new HiveBolt(hiveOptions); + bolt.prepare(config, null, new OutputCollector(collector)); + + //The tick should NOT cause any acks since the batch was empty + Tuple mockTick = MockTupleHelpers.mockTickTuple(); + bolt.execute(mockTick); + verifyZeroInteractions(collector); + + bolt.cleanup(); + } + + @Test public void testMultiPartitionTuples() throws Exception { DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper() http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/storm-core/pom.xml ---------------------------------------------------------------------- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 7dc5c03..940ab45 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -783,6 +783,18 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/storm/blob/c8508669/storm-core/test/jvm/backtype/storm/utils/MockTupleHelpers.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/backtype/storm/utils/MockTupleHelpers.java b/storm-core/test/jvm/backtype/storm/utils/MockTupleHelpers.java new file mode 100644 index 0000000..a78a168 --- /dev/null +++ b/storm-core/test/jvm/backtype/storm/utils/MockTupleHelpers.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.utils; + +import backtype.storm.Constants; +import backtype.storm.tuple.Tuple; + +import org.mockito.Mockito; + +public final class MockTupleHelpers { + + private MockTupleHelpers() { + } + + public static Tuple mockTickTuple() { + return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID); + } + + public static Tuple mockTuple(String componentId, String streamId) { + Tuple tuple = Mockito.mock(Tuple.class); + Mockito.when(tuple.getSourceComponent()).thenReturn(componentId); + Mockito.when(tuple.getSourceStreamId()).thenReturn(streamId); + return tuple; + } +}