http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java new file mode 100644 index 0000000..1891873 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.util; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; + +import java.util.Map; + +public class FiniteTestSpout implements IRichSpout { + private static final long serialVersionUID = 7992419478267824279L; + + private int numberOfOutputTuples; + private SpoutOutputCollector collector; + + public FiniteTestSpout(final int numberOfOutputTuples) { + this.numberOfOutputTuples = numberOfOutputTuples; + } + + @SuppressWarnings("rawtypes") + @Override + public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { + this.collector = collector; + } + + @Override + public void close() {/* nothing to do */} + + @Override + public void activate() {/* nothing to do */} + + @Override + public void deactivate() {/* nothing to do */} + + @Override + public void nextTuple() { + if (--this.numberOfOutputTuples >= 0) { + this.collector.emit(new Values(new Integer(this.numberOfOutputTuples))); + } + } + + @Override + public void ack(final Object msgId) {/* nothing to do */} + + @Override + public void fail(final Object msgId) {/* nothing to do */} + + @Override + public void declareOutputFields(final OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("dummy")); + } + + @Override + public Map<String, Object> getComponentConfiguration() { + return null; + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java new file mode 100644 index 0000000..8e63563 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.stormcompatibility.util; + +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer; +import org.junit.Assert; +import org.junit.Test; + +import java.util.LinkedList; + +public class FlinkOutputFieldsDeclarerTest extends AbstractTest { + + + + @Test + public void testNull() { + Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null)); + } + + @Test + public void testDeclare() { + for (int i = 0; i < 2; ++i) { // test case: simple / non-direct + for (int j = 1; j < 2; ++j) { // number of streams + for (int k = 0; k <= 25; ++k) { // number of attributes + this.runDeclareTest(i, j, k); + } + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareSimpleToManyAttributes() { + this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareNonDirectToManyAttributes() { + this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareDefaultStreamToManyAttributes() { + this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareFullToManyAttributes() { + this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26); + } + + private void runDeclareTest(final int testCase, final int numberOfStreams, + final int numberOfAttributes) { + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + + String[] streams = null; + if (numberOfStreams > 1 || r.nextBoolean()) { + streams = new String[numberOfStreams]; + for (int i = 0; i < numberOfStreams; ++i) { + streams[i] = "stream" + i; + } + } + + final String[] attributes = new String[numberOfAttributes]; + for (int i = 0; i < attributes.length; ++i) { + attributes[i] = "a" + i; + } + + switch (testCase) { + case 0: + this.declareSimple(declarer, streams, attributes); + break; + default: + this.declareNonDirect(declarer, streams, attributes); + } + + if (streams == null) { + streams = new String[] { Utils.DEFAULT_STREAM_ID }; + } + + for (String stream : streams) { + final TypeInformation<?> type = declarer.getOutputType(stream); + + if (numberOfAttributes == 1) { + Assert.assertEquals(type.getClass(), GenericTypeInfo.class); + Assert.assertEquals(type.getTypeClass(), Object.class); + } else { + Assert.assertEquals(numberOfAttributes, type.getArity()); + Assert.assertTrue(type.isTupleType()); + } + } + } + + private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams, + final String[] attributes) { + + if (streams != null) { + for (String stream : streams) { + declarer.declareStream(stream, new Fields(attributes)); + } + } else { + declarer.declare(new Fields(attributes)); + } + } + + private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams, + final String[] attributes) { + + if (streams != null) { + for (String stream : streams) { + declarer.declareStream(stream, false, new Fields(attributes)); + } + } else { + declarer.declare(false, new Fields(attributes)); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testUndeclared() { + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + declarer.getOutputType("unknownStreamId"); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeclareDirect() { + new FlinkOutputFieldsDeclarer().declare(true, null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeclareDirect2() { + new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null); + } + + @Test + public void testGetGroupingFieldIndexes() { + final int numberOfAttributes = 5 + this.r.nextInt(21); + final String[] attributes = new String[numberOfAttributes]; + for (int i = 0; i < numberOfAttributes; ++i) { + attributes[i] = "a" + i; + } + + final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + declarer.declare(new Fields(attributes)); + + final int numberOfKeys = 1 + this.r.nextInt(25); + final LinkedList<String> groupingFields = new LinkedList<String>(); + final boolean[] indexes = new boolean[numberOfAttributes]; + + for (int i = 0; i < numberOfAttributes; ++i) { + if (this.r.nextInt(26) < numberOfKeys) { + groupingFields.add(attributes[i]); + indexes[i] = true; + } else { + indexes[i] = false; + } + } + + final int[] expectedResult = new int[groupingFields.size()]; + int j = 0; + for (int i = 0; i < numberOfAttributes; ++i) { + if (indexes[i]) { + expectedResult[j++] = i; + } + } + + final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID, + groupingFields); + + Assert.assertEquals(expectedResult.length, result.length); + for (int i = 0; i < expectedResult.length; ++i) { + Assert.assertEquals(expectedResult[i], result[i]); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java new file mode 100644 index 0000000..c3cb7d7 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.stormcompatibility.util; + +import java.util.Iterator; + +import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector; +import org.apache.flink.stormcompatibility.util.SplitStreamType; +import org.junit.Assert; +import org.junit.Test; + +public class FlinkStormStreamSelectorTest { + + @Test + public void testSelector() { + FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>(); + SplitStreamType<Object> tuple = new SplitStreamType<Object>(); + Iterator<String> result; + + tuple.streamId = "stream1"; + result = selector.select(tuple).iterator(); + Assert.assertEquals("stream1", result.next()); + Assert.assertFalse(result.hasNext()); + + tuple.streamId = "stream2"; + result = selector.select(tuple).iterator(); + Assert.assertEquals("stream2", result.next()); + Assert.assertFalse(result.hasNext()); + + tuple.streamId = "stream1"; + result = selector.select(tuple).iterator(); + Assert.assertEquals("stream1", result.next()); + Assert.assertFalse(result.hasNext()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java new file mode 100644 index 0000000..bd9ea3f --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.util; + +import java.util.HashMap; + +import backtype.storm.generated.Bolt; +import backtype.storm.generated.SpoutSpec; +import backtype.storm.generated.StateSpoutSpec; +import backtype.storm.generated.StormTopology; +import backtype.storm.metric.api.ICombiner; +import backtype.storm.metric.api.IMetric; +import backtype.storm.metric.api.IReducer; + +import org.apache.flink.stormcompatibility.util.FlinkTopologyContext; +import org.junit.Test; + + +/* + * FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here, + * because those are tested in StormWrapperSetupHelperTest. + */ +public class FlinkTopologyContextTest extends AbstractTest { + + @Test(expected = UnsupportedOperationException.class) + public void testAddTaskHook() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .addTaskHook(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetHooks() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .getHooks(); + } + + @SuppressWarnings("rawtypes") + @Test(expected = UnsupportedOperationException.class) + public void testRegisteredMetric1() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .registerMetric(null, (ICombiner) null, 0); + } + + @SuppressWarnings("rawtypes") + @Test(expected = UnsupportedOperationException.class) + public void testRegisteredMetric2() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .registerMetric(null, (IReducer) null, 0); + } + + @Test(expected = UnsupportedOperationException.class) + public void testRegisteredMetric3() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .registerMetric(null, (IMetric) null, 0); + } + + @Test(expected = UnsupportedOperationException.class) + public void testGetRegisteredMetricByName() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .getRegisteredMetricByName(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testSetAllSubscribedState() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .setAllSubscribedState(null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testSetSubscribedState1() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .setSubscribedState(null, null); + } + + @Test(expected = UnsupportedOperationException.class) + public void testSetSubscribedState2() { + new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(), + new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null, + null, null, null, null, null, null, null, null, null, null, null, null, null) + .setSubscribedState(null, null, null); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java index ec48719..b499373 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java @@ -24,6 +24,7 @@ import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; public class TestDummyBolt implements IRichBolt { private static final long serialVersionUID = 6893611247443121322L; @@ -31,12 +32,27 @@ public class TestDummyBolt implements IRichBolt { public final static String shuffleStreamId = "shuffleStream"; public final static String groupingStreamId = "groupingStream"; + private boolean emit = true; + private TopologyContext context; + private OutputCollector collector; + @SuppressWarnings("rawtypes") @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {} + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + this.context = context; + this.collector = collector; + } @Override - public void execute(Tuple input) {} + public void execute(Tuple input) { + if (this.context.getThisTaskIndex() == 0) { + this.collector.emit(shuffleStreamId, input.getValues()); + } + if (this.emit) { + this.collector.emit(groupingStreamId, new Values("bolt", this.context)); + this.emit = false; + } + } @Override public void cleanup() {} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java index 62705b8..345ca12 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java @@ -23,6 +23,7 @@ import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class TestDummySpout implements IRichSpout { @@ -30,9 +31,16 @@ public class TestDummySpout implements IRichSpout { public final static String spoutStreamId = "spout-stream"; + private boolean emit = true; + private TopologyContext context; + private SpoutOutputCollector collector; + @SuppressWarnings("rawtypes") @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {} + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.context = context; + this.collector = collector; + } @Override public void close() {} @@ -44,7 +52,12 @@ public class TestDummySpout implements IRichSpout { public void deactivate() {} @Override - public void nextTuple() {} + public void nextTuple() { + if (this.emit) { + this.collector.emit(new Values(this.context)); + this.emit = false; + } + } @Override public void ack(Object msgId) {} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java index 5699219..c8e5584 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java @@ -16,6 +16,8 @@ */ package org.apache.flink.stormcompatibility.util; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import backtype.storm.task.OutputCollector; @@ -27,12 +29,22 @@ import backtype.storm.tuple.Tuple; public class TestSink implements IRichBolt { private static final long serialVersionUID = 4314871456719370877L; + public final static List<TopologyContext> result = new LinkedList<TopologyContext>(); + @SuppressWarnings("rawtypes") @Override - public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {} + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + result.add(context); + } @Override - public void execute(Tuple input) {} + public void execute(Tuple input) { + if (input.size() == 1) { + result.add((TopologyContext) input.getValue(0)); + } else { + result.add((TopologyContext) input.getValue(1)); + } + } @Override public void cleanup() {} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java index 381e130..b44e8a1 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java @@ -18,6 +18,8 @@ package org.apache.flink.stormcompatibility.wrappers; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.stormcompatibility.util.FiniteStormSpout; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.junit.Test; @@ -43,6 +45,8 @@ public class FiniteStormSpoutWrapperTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout); wrapper.setRuntimeContext(taskContext); @@ -59,6 +63,8 @@ public class FiniteStormSpoutWrapperTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout); wrapper.setRuntimeContext(taskContext); http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java deleted file mode 100644 index eef35cf..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.IRichSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; - -import java.util.Map; - -class FiniteTestSpout implements IRichSpout { - private static final long serialVersionUID = 7992419478267824279L; - - private int numberOfOutputTuples; - private SpoutOutputCollector collector; - - public FiniteTestSpout(final int numberOfOutputTuples) { - this.numberOfOutputTuples = numberOfOutputTuples; - } - - @SuppressWarnings("rawtypes") - @Override - public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { - this.collector = collector; - } - - @Override - public void close() {/* nothing to do */} - - @Override - public void activate() {/* nothing to do */} - - @Override - public void deactivate() {/* nothing to do */} - - @Override - public void nextTuple() { - if (--this.numberOfOutputTuples >= 0) { - this.collector.emit(new Values(new Integer(this.numberOfOutputTuples))); - } - } - - @Override - public void ack(final Object msgId) {/* nothing to do */} - - @Override - public void fail(final Object msgId) {/* nothing to do */} - - @Override - public void declareOutputFields(final OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("dummy")); - } - - @Override - public Map<String, Object> getComponentConfiguration() { - return null; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java deleted file mode 100644 index c0a6ed3..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.wrappers; - -import java.util.Iterator; - -import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector; -import org.apache.flink.stormcompatibility.util.SplitStreamType; -import org.junit.Assert; -import org.junit.Test; - -public class FlinkStormStreamSelectorTest { - - @Test - public void testSelector() { - FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>(); - SplitStreamType<Object> tuple = new SplitStreamType<Object>(); - Iterator<String> result; - - tuple.streamId = "stream1"; - result = selector.select(tuple).iterator(); - Assert.assertEquals("stream1", result.next()); - Assert.assertFalse(result.hasNext()); - - tuple.streamId = "stream2"; - result = selector.select(tuple).iterator(); - Assert.assertEquals("stream2", result.next()); - Assert.assertFalse(result.hasNext()); - - tuple.streamId = "stream1"; - result = selector.select(tuple).iterator(); - Assert.assertEquals("stream1", result.next()); - Assert.assertFalse(result.hasNext()); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java new file mode 100644 index 0000000..738eb1e --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.wrappers; + +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +import org.apache.flink.stormcompatibility.util.AbstractTest; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; + +public class SetupOutputFieldsDeclarerTest extends AbstractTest { + + @Test + public void testDeclare() { + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); + + int numberOfAttributes = this.r.nextInt(26); + declarer.declare(createSchema(numberOfAttributes)); + Assert.assertEquals(1, declarer.outputSchemas.size()); + Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID) + .intValue()); + + final String sid = "streamId"; + numberOfAttributes = this.r.nextInt(26); + declarer.declareStream(sid, createSchema(numberOfAttributes)); + Assert.assertEquals(2, declarer.outputSchemas.size()); + Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue()); + } + + private Fields createSchema(final int numberOfAttributes) { + final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes); + for (int i = 0; i < numberOfAttributes; ++i) { + schema.add("a" + i); + } + return new Fields(schema); + } + + @Test + public void testDeclareDirect() { + new SetupOutputFieldsDeclarer().declare(false, new Fields()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeclareDirectFail() { + new SetupOutputFieldsDeclarer().declare(true, new Fields()); + } + + @Test + public void testDeclareStream() { + new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields()); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareStreamFail() { + new SetupOutputFieldsDeclarer().declareStream(null, new Fields()); + } + + @Test + public void testDeclareFullStream() { + new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields()); + } + + @Test(expected = IllegalArgumentException.class) + public void testDeclareFullStreamFailNonDefaultStream() { + new SetupOutputFieldsDeclarer().declareStream(null, false, new Fields()); + } + + @Test(expected = UnsupportedOperationException.class) + public void testDeclareFullStreamFailDirect() { + new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java index 5cfb151..6817593 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java @@ -61,9 +61,9 @@ public class StormBoltWrapperTest extends AbstractTest { @Test(expected = IllegalArgumentException.class) public void testWrapperRawType() throws Exception { - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields("dummy1", "dummy2")); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] { Utils.DEFAULT_STREAM_ID }); @@ -71,26 +71,26 @@ public class StormBoltWrapperTest extends AbstractTest { @Test(expected = IllegalArgumentException.class) public void testWrapperToManyAttributes1() throws Exception { - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); final String[] schema = new String[26]; for (int i = 0; i < schema.length; ++i) { schema[i] = "a" + i; } declarer.declare(new Fields(schema)); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); new StormBoltWrapper<Object, Object>(mock(IRichBolt.class)); } @Test(expected = IllegalArgumentException.class) public void testWrapperToManyAttributes2() throws Exception { - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); final String[] schema = new String[26]; for (int i = 0; i < schema.length; ++i) { schema[i] = "a" + i; } declarer.declare(new Fields(schema)); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new String[] {}); } @@ -133,12 +133,14 @@ public class StormBoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final IRichBolt bolt = mock(IRichBolt.class); - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields(schema)); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, (Fields) null); wrapper.setup(mock(Output.class), taskContext); @@ -163,6 +165,8 @@ public class StormBoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class)); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final Output output = mock(Output.class); @@ -209,14 +213,17 @@ public class StormBoltWrapperTest extends AbstractTest { final ExecutionConfig taskConfig = mock(ExecutionConfig.class); when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig) - .thenReturn(flinkConfig); + .thenReturn(flinkConfig); final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(taskConfig); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields("dummy")); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + final IRichBolt bolt = mock(IRichBolt.class); final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt); @@ -249,8 +256,11 @@ public class StormBoltWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(taskConfig); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final IRichBolt bolt = mock(IRichBolt.class); + final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt); wrapper.setup(mock(Output.class), taskContext); @@ -275,9 +285,9 @@ public class StormBoltWrapperTest extends AbstractTest { public void testClose() throws Exception { final IRichBolt bolt = mock(IRichBolt.class); - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields("dummy")); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt); http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java index a4eea7e..77f1b05 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java @@ -22,7 +22,9 @@ import backtype.storm.tuple.Fields; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.configuration.Configuration; import org.apache.flink.stormcompatibility.util.AbstractTest; +import org.apache.flink.stormcompatibility.util.FiniteTestSpout; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.junit.Assert; @@ -46,12 +48,14 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest { @SuppressWarnings("unchecked") @Test public void testRunExecuteFixedNumber() throws Exception { - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields("dummy")); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final IRichSpout spout = mock(IRichSpout.class); final int numberOfCalls = this.r.nextInt(50); @@ -73,6 +77,8 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final IRichSpout spout = new FiniteTestSpout(numberOfCalls); final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>( @@ -94,11 +100,12 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest { StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final IRichSpout spout = new FiniteTestSpout(numberOfCalls); final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>( spout); - spoutWrapper.setRuntimeContext(taskContext); spoutWrapper.cancel(); final TestContext collector = new TestContext(); http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java deleted file mode 100644 index 561939f..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -import org.apache.flink.stormcompatibility.util.AbstractTest; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; - -public class StormOutputFieldsDeclarerTest extends AbstractTest { - - @Test - public void testDeclare() { - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); - - int numberOfAttributes = this.r.nextInt(26); - declarer.declare(createSchema(numberOfAttributes)); - Assert.assertEquals(1, declarer.outputSchemas.size()); - Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID) - .intValue()); - - final String sid = "streamId"; - numberOfAttributes = this.r.nextInt(26); - declarer.declareStream(sid, createSchema(numberOfAttributes)); - Assert.assertEquals(2, declarer.outputSchemas.size()); - Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue()); - } - - private Fields createSchema(final int numberOfAttributes) { - final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes); - for (int i = 0; i < numberOfAttributes; ++i) { - schema.add("a" + i); - } - return new Fields(schema); - } - - @Test - public void testDeclareDirect() { - new StormOutputFieldsDeclarer().declare(false, new Fields()); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeclareDirectFail() { - new StormOutputFieldsDeclarer().declare(true, new Fields()); - } - - @Test - public void testDeclareStream() { - new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields()); - } - - @Test(expected = IllegalArgumentException.class) - public void testDeclareStreamFail() { - new StormOutputFieldsDeclarer().declareStream(null, new Fields()); - } - - @Test - public void testDeclareFullStream() { - new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields()); - } - - @Test(expected = IllegalArgumentException.class) - public void testDeclareFullStreamFailNonDefaultStream() { - new StormOutputFieldsDeclarer().declareStream(null, false, new Fields()); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeclareFullStreamFailDirect() { - new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields()); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java index 04dc48d..f4fb4da 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java @@ -25,9 +25,11 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.configuration.Configuration; import org.apache.flink.stormcompatibility.util.AbstractTest; +import org.apache.flink.stormcompatibility.util.FiniteTestSpout; import org.apache.flink.stormcompatibility.util.StormConfig; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -89,8 +91,12 @@ public class StormSpoutWrapperTest extends AbstractTest { final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class); when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig()); + when(taskContext.getTaskStubParameters()).thenReturn(new Configuration()); + when(taskContext.getTaskName()).thenReturn("name"); final IRichSpout spout = new FiniteTestSpout(numberOfCalls); + + final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout); spoutWrapper.setRuntimeContext(taskContext); http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java index 7497ffc..c799d63 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java @@ -14,29 +14,46 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.stormcompatibility.wrappers; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.generated.ComponentCommon; +import backtype.storm.generated.StormTopology; +import backtype.storm.task.TopologyContext; import backtype.storm.topology.IComponent; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; +import org.apache.flink.stormcompatibility.api.TestTopologyBuilder; import org.apache.flink.stormcompatibility.util.AbstractTest; +import org.apache.flink.stormcompatibility.util.TestDummyBolt; +import org.apache.flink.stormcompatibility.util.TestDummySpout; +import org.apache.flink.stormcompatibility.util.TestSink; +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import com.google.common.collect.Sets; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +@PowerMockIgnore("javax.*") @RunWith(PowerMockRunner.class) @PrepareForTest(StormWrapperSetupHelper.class) public class StormWrapperSetupHelperTest extends AbstractTest { @@ -65,9 +82,9 @@ public class StormWrapperSetupHelperTest extends AbstractTest { boltOrSpout = mock(IRichBolt.class); } - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields("dummy1", "dummy2")); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID })); @@ -83,13 +100,13 @@ public class StormWrapperSetupHelperTest extends AbstractTest { boltOrSpout = mock(IRichBolt.class); } - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); final String[] schema = new String[26]; for (int i = 0; i < schema.length; ++i) { schema[i] = "a" + i; } declarer.declare(new Fields(schema)); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, null); } @@ -119,9 +136,9 @@ public class StormWrapperSetupHelperTest extends AbstractTest { boltOrSpout = mock(IRichBolt.class); } - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); declarer.declare(new Fields(schema)); - PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); + PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); HashMap<String, Integer> attributes = new HashMap<String, Integer>(); attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes); @@ -132,4 +149,167 @@ public class StormWrapperSetupHelperTest extends AbstractTest { .newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null)); } + @Test + public void testCreateTopologyContext() { + HashMap<String, Integer> dops = new HashMap<String, Integer>(); + dops.put("spout1", 1); + dops.put("spout2", 3); + dops.put("bolt1", 1); + dops.put("bolt2", 2); + dops.put("sink", 1); + + HashMap<String, Integer> taskCounter = new HashMap<String, Integer>(); + taskCounter.put("spout1", 0); + taskCounter.put("spout2", 0); + taskCounter.put("bolt1", 0); + taskCounter.put("bolt2", 0); + taskCounter.put("sink", 0); + + HashMap<String, IComponent> operators = new HashMap<String, IComponent>(); + operators.put("spout1", new TestDummySpout()); + operators.put("spout2", new TestDummySpout()); + operators.put("bolt1", new TestDummyBolt()); + operators.put("bolt2", new TestDummyBolt()); + operators.put("sink", new TestSink()); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1")); + builder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2")); + builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1"); + builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); + builder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) + .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id")) + .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) + .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id")) + .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); + + final int maxRetry = 3; + int counter; + for (counter = 0; counter < maxRetry; ++counter) { + LocalCluster cluster = new LocalCluster(); + Config c = new Config(); + c.setNumAckers(0); + cluster.submitTopology("test", c, builder.createTopology()); + Utils.sleep((counter + 1) * 5000); + cluster.shutdown(); + + if (TestSink.result.size() == 8) { + break; + } + } + Assert.assertTrue(counter < maxRetry); + + TestTopologyBuilder flinkBuilder = new TestTopologyBuilder(); + + flinkBuilder.setSpout("spout1", (IRichSpout) operators.get("spout1"), dops.get("spout1")); + flinkBuilder.setSpout("spout2", (IRichSpout) operators.get("spout2"), dops.get("spout2")); + flinkBuilder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1"); + flinkBuilder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2"); + flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), dops.get("sink")) + .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new Fields("id")) + .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId) + .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new Fields("id")) + .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId); + + flinkBuilder.createTopology(); + StormTopology stormTopology = flinkBuilder.getStormTopology(); + + Set<Integer> taskIds = new HashSet<Integer>(); + + for (TopologyContext expectedContext : TestSink.result) { + final String thisComponentId = expectedContext.getThisComponentId(); + int index = taskCounter.get(thisComponentId); + + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + when(context.getTaskName()).thenReturn(thisComponentId); + when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId)); + when(context.getIndexOfThisSubtask()).thenReturn(index); + taskCounter.put(thisComponentId, ++index); + + Config stormConfig = new Config(); + stormConfig.put(StormWrapperSetupHelper.TOPOLOGY_NAME, "test"); + + TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext( + context, operators.get(thisComponentId), stormTopology, stormConfig); + + ComponentCommon expcetedCommon = expectedContext.getComponentCommon(thisComponentId); + ComponentCommon common = topologyContext.getComponentCommon(thisComponentId); + + Assert.assertNull(topologyContext.getCodeDir()); + Assert.assertNull(common.get_json_conf()); + Assert.assertNull(topologyContext.getExecutorData(null)); + Assert.assertNull(topologyContext.getPIDDir()); + Assert.assertNull(topologyContext.getResource(null)); + Assert.assertNull(topologyContext.getSharedExecutor()); + Assert.assertNull(expectedContext.getTaskData(null)); + Assert.assertNull(topologyContext.getThisWorkerPort()); + + Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId())); + + Assert.assertEquals(expcetedCommon.get_inputs(), common.get_inputs()); + Assert.assertEquals(expcetedCommon.get_parallelism_hint(), common.get_parallelism_hint()); + Assert.assertEquals(expcetedCommon.get_streams(), common.get_streams()); + Assert.assertEquals(expectedContext.getComponentIds(), topologyContext.getComponentIds()); + Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId), + topologyContext.getComponentStreams(thisComponentId)); + Assert.assertEquals(thisComponentId, topologyContext.getThisComponentId()); + Assert.assertEquals(expectedContext.getThisSources(), topologyContext.getThisSources()); + Assert.assertEquals(expectedContext.getThisStreams(), topologyContext.getThisStreams()); + Assert.assertEquals(expectedContext.getThisTargets(), topologyContext.getThisTargets()); + Assert.assertEquals(0, topologyContext.getThisWorkerTasks().size()); + + for (int taskId : topologyContext.getComponentTasks(thisComponentId)) { + Assert.assertEquals(thisComponentId, topologyContext.getComponentId(taskId)); + } + + for (String componentId : expectedContext.getComponentIds()) { + Assert.assertEquals(expectedContext.getSources(componentId), + topologyContext.getSources(componentId)); + Assert.assertEquals(expectedContext.getTargets(componentId), + topologyContext.getTargets(componentId)); + + for (String streamId : expectedContext.getComponentStreams(componentId)) { + Assert.assertEquals( + expectedContext.getComponentOutputFields(componentId, streamId).toList(), + topologyContext.getComponentOutputFields(componentId, streamId).toList()); + } + } + + for (String streamId : expectedContext.getThisStreams()) { + Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(), + topologyContext.getThisOutputFields(streamId).toList()); + } + + HashMap<Integer, String> taskToComponents = new HashMap<Integer, String>(); + Set<Integer> allTaskIds = new HashSet<Integer>(); + for (String componentId : expectedContext.getComponentIds()) { + List<Integer> possibleTasks = expectedContext.getComponentTasks(componentId); + List<Integer> tasks = topologyContext.getComponentTasks(componentId); + + Iterator<Integer> p_it = possibleTasks.iterator(); + Iterator<Integer> t_it = tasks.iterator(); + while(p_it.hasNext()) { + Assert.assertTrue(t_it.hasNext()); + Assert.assertNull(taskToComponents.put(p_it.next(), componentId)); + Assert.assertTrue(allTaskIds.add(t_it.next())); + } + Assert.assertFalse(t_it.hasNext()); + } + + Assert.assertEquals(taskToComponents, expectedContext.getTaskToComponent()); + Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId())); + + try { + topologyContext.getHooks(); + Assert.fail(); + } catch (UnsupportedOperationException e) { /* expected */ } + + try { + topologyContext.getRegisteredMetricByName(null); + Assert.fail(); + } catch (UnsupportedOperationException e) { /* expected */ } + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java index 64b3e28..d3776fb 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java @@ -20,7 +20,7 @@ import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Values; -import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout; +import org.apache.flink.stormcompatibility.util.FiniteStormSpout; import java.io.IOException; import java.util.Map; http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java index 6fb764d..5efff66 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java @@ -18,7 +18,7 @@ package org.apache.flink.stormcompatibility.util; -import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout; +import org.apache.flink.stormcompatibility.util.FiniteStormSpout; /** * Implements a Storm Spout that reads String[] data stored in the memory. The spout stops http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java index c992b6b..5f637d3 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java @@ -49,10 +49,10 @@ public class SplitBoltTopology { final String[] tokens = outputPath.split(":"); final String outputFile = tokens[tokens.length - 1]; builder.setBolt(sinkId, new StormBoltFileSink(outputFile, formatter)) - .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId); + .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId); } else { builder.setBolt(sinkId, new StormBoltPrintSink(formatter), 4) - .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId); + .shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId); } return builder;