http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java index 9a23b0f..0ee796b 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java @@ -17,6 +17,8 @@ package org.apache.flink.storm.wrappers; +import org.apache.flink.storm.util.AbstractTest; + import org.apache.storm.generated.Bolt; import org.apache.storm.generated.SpoutSpec; import org.apache.storm.generated.StateSpoutSpec; @@ -24,13 +26,11 @@ import org.apache.storm.generated.StormTopology; import org.apache.storm.metric.api.ICombiner; import org.apache.storm.metric.api.IMetric; import org.apache.storm.metric.api.IReducer; -import org.apache.flink.storm.util.AbstractTest; import org.junit.Test; import java.util.HashMap; - -/* +/** * FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here, * because those are tested in StormWrapperSetupHelperTest. */
http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java index 94a88fe..d6575d8 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java @@ -17,14 +17,18 @@ package org.apache.flink.storm.wrappers; +import org.apache.flink.storm.util.AbstractTest; + import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; -import org.apache.flink.storm.util.AbstractTest; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; +/** + * Tests for the SetupOutputFieldsDeclarer. + */ public class SetupOutputFieldsDeclarerTest extends AbstractTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java index eb91c63..b91871a 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java @@ -17,11 +17,11 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.tuple.Values; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; + +import org.apache.storm.tuple.Values; import org.junit.Assert; import org.junit.Test; @@ -31,6 +31,9 @@ import java.util.List; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +/** + * Tests for the SpoutCollector. + */ public class SpoutCollectorTest extends AbstractTest { @SuppressWarnings({ "rawtypes", "unchecked" }) http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java index 265e705..e6d861b 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java @@ -17,10 +17,6 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichSpout; -import org.apache.storm.tuple.Fields; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; @@ -31,6 +27,11 @@ import org.apache.flink.storm.util.StormConfig; import org.apache.flink.storm.util.TestDummySpout; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.tuple.Fields; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,6 +52,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +/** + * Tests for the SpoutWrapper. + */ @RunWith(PowerMockRunner.class) @PrepareForTest(WrapperSetupHelper.class) @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"}) http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java index 5e6c160..2ff6c45 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java @@ -17,15 +17,15 @@ package org.apache.flink.storm.wrappers; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.storm.util.AbstractTest; + import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.MessageId; import org.apache.storm.tuple.Values; - -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.storm.util.AbstractTest; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -35,6 +35,9 @@ import java.util.List; import static org.mockito.Mockito.mock; +/** + * Tests for the StormTuple. + */ public class StormTupleTest extends AbstractTest { private static final String fieldName = "fieldName"; private static final String fieldNamePojo = "member"; @@ -638,8 +641,8 @@ public class StormTupleTest extends AbstractTest { tuple.setField(value, index); ArrayList<String> attributeNames = new ArrayList<String>(arity); - for(int i = 0; i < arity; ++i) { - if(i == index) { + for (int i = 0; i < arity; ++i) { + if (i == index) { attributeNames.add(fieldName); } else { attributeNames.add("" + i); @@ -685,7 +688,7 @@ public class StormTupleTest extends AbstractTest { Assert.assertSame(messageId, stormTuple.getMessageId()); } - public static class TestPojoMember<T> { + private static class TestPojoMember<T> { public T member; public TestPojoMember(T value) { @@ -693,7 +696,7 @@ public class StormTupleTest extends AbstractTest { } } - public static class TestPojoGetter<T> { + private static class TestPojoGetter<T> { private T member; public TestPojoGetter(T value) { http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java index 5f38705..3118d6b 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java @@ -18,6 +18,7 @@ package org.apache.flink.storm.wrappers; import org.apache.flink.storm.util.AbstractTest; + import org.apache.storm.topology.IComponent; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.IRichSpout; @@ -37,6 +38,9 @@ import java.util.HashSet; import static java.util.Collections.singleton; import static org.mockito.Mockito.mock; +/** + * Tests for the WrapperSetupHelper. + */ @RunWith(PowerMockRunner.class) @PrepareForTest(WrapperSetupHelper.class) @PowerMockIgnore({"javax.*", "org.apache.log4j.*"}) http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java index 00173df..2b0b275 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupInLocalClusterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.storm.util.TestDummyBolt; import org.apache.flink.storm.util.TestDummySpout; import org.apache.flink.storm.util.TestSink; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.ComponentCommon; @@ -36,11 +37,18 @@ import org.apache.storm.utils.Utils; import org.junit.Assert; import org.junit.Test; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +/** + * Tests for the setup of wrappers in a local cluster. + */ public class WrapperSetupInLocalClusterTest extends AbstractTest { @Test @@ -73,17 +81,17 @@ public class WrapperSetupInLocalClusterTest extends AbstractTest { builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1"); builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) - .shuffleGrouping("bolt1", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.groupingStreamId) - .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); + .shuffleGrouping("bolt1", TestDummyBolt.GROUPING_STREAM_ID) + .shuffleGrouping("bolt1", TestDummyBolt.SHUFFLE_STREAM_ID) + .shuffleGrouping("bolt2", TestDummyBolt.GROUPING_STREAM_ID) + .shuffleGrouping("bolt2", TestDummyBolt.SHUFFLE_STREAM_ID); LocalCluster cluster = new LocalCluster(); Config c = new Config(); c.setNumAckers(0); cluster.submitTopology("test", c, builder.createTopology()); - while (TestSink.result.size() != 8) { + while (TestSink.RESULT.size() != 8) { Utils.sleep(100); } cluster.shutdown(); @@ -92,7 +100,7 @@ public class WrapperSetupInLocalClusterTest extends AbstractTest { Set<Integer> taskIds = new HashSet<Integer>(); - for (TopologyContext expectedContext : TestSink.result) { + for (TopologyContext expectedContext : TestSink.RESULT) { final String thisComponentId = expectedContext.getThisComponentId(); int index = taskCounter.get(thisComponentId); @@ -162,14 +170,14 @@ public class WrapperSetupInLocalClusterTest extends AbstractTest { List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId); List<Integer> tasks = topologyContext.getComponentTasks(componentId); - Iterator<Integer> p_it = possibleTasks.iterator(); - Iterator<Integer> t_it = tasks.iterator(); - while(p_it.hasNext()) { - Assert.assertTrue(t_it.hasNext()); - Assert.assertNull(taskToComponents.put(p_it.next(), componentId)); - Assert.assertTrue(allTaskIds.add(t_it.next())); + Iterator<Integer> pIt = possibleTasks.iterator(); + Iterator<Integer> tIt = tasks.iterator(); + while (pIt.hasNext()) { + Assert.assertTrue(tIt.hasNext()); + Assert.assertNull(taskToComponents.put(pIt.next(), componentId)); + Assert.assertTrue(allTaskIds.add(tIt.next())); } - Assert.assertFalse(t_it.hasNext()); + Assert.assertFalse(tIt.hasNext()); } Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent()); http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/resources/log4j-test.properties b/flink-contrib/flink-storm/src/test/resources/log4j-test.properties index 0b686e5..881dc06 100644 --- a/flink-contrib/flink-storm/src/test/resources/log4j-test.properties +++ b/flink-contrib/flink-storm/src/test/resources/log4j-test.properties @@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender # A1 uses PatternLayout. log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n