[FLINK-2566] FlinkTopologyContext not populated completely - extended FlinkTopologyContext to be populted with all supportable attributes - added JUnit test - updated README.md additionally: module restructuring to get cleaner package structure
This closes #1135 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a67a60f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a67a60f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a67a60f Branch: refs/heads/master Commit: 7a67a60f934123863ca96a95e30471c99bb8088a Parents: 39115ab Author: mjsax <mj...@informatik.hu-berlin.de> Authored: Tue Sep 15 23:59:31 2015 +0200 Committer: mjsax <mj...@informatik.hu-berlin.de> Committed: Tue Oct 6 13:29:32 2015 +0200 ---------------------------------------------------------------------- .../flink-storm-compatibility-core/README.md | 2 +- .../api/FlinkOutputFieldsDeclarer.java | 170 ---------------- .../stormcompatibility/api/FlinkTopology.java | 16 +- .../api/FlinkTopologyBuilder.java | 39 ++-- .../api/FlinkTopologyContext.java | 161 --------------- .../util/FiniteStormSpout.java | 39 ++++ .../util/FlinkOutputFieldsDeclarer.java | 168 ++++++++++++++++ .../util/FlinkStormStreamSelector.java | 2 +- .../util/FlinkTopologyContext.java | 164 ++++++++++++++++ .../util/SplitStreamTypeKeySelector.java | 46 +++++ .../wrappers/AbstractStormSpoutWrapper.java | 36 ++-- .../wrappers/FiniteStormSpout.java | 37 ---- .../wrappers/FiniteStormSpoutWrapper.java | 1 + .../wrappers/SetupOutputFieldsDeclarer.java | 63 ++++++ .../wrappers/StormBoltWrapper.java | 17 +- .../wrappers/StormOutputFieldsDeclarer.java | 63 ------ .../wrappers/StormWrapperSetupHelper.java | 192 ++++++++++++++++-- .../api/FlinkOutputFieldsDeclarerTest.java | 193 ------------------ .../api/FlinkTopologyContextTest.java | 74 ------- .../api/FlinkTopologyTest.java | 10 +- .../api/TestTopologyBuilder.java | 27 +++ .../util/FiniteTestSpout.java | 77 ++++++++ .../util/FlinkOutputFieldsDeclarerTest.java | 193 ++++++++++++++++++ .../util/FlinkStormStreamSelectorTest.java | 51 +++++ .../util/FlinkTopologyContextTest.java | 114 +++++++++++ .../stormcompatibility/util/TestDummyBolt.java | 20 +- .../stormcompatibility/util/TestDummySpout.java | 17 +- .../flink/stormcompatibility/util/TestSink.java | 16 +- .../wrappers/FiniteStormSpoutWrapperTest.java | 6 + .../wrappers/FiniteTestSpout.java | 77 -------- .../wrappers/FlinkStormStreamSelectorTest.java | 51 ----- .../wrappers/SetupOutputFieldsDeclarerTest.java | 91 +++++++++ .../wrappers/StormBoltWrapperTest.java | 36 ++-- .../wrappers/StormFiniteSpoutWrapperTest.java | 13 +- .../wrappers/StormOutputFieldsDeclarerTest.java | 91 --------- .../wrappers/StormSpoutWrapperTest.java | 6 + .../wrappers/StormWrapperSetupHelperTest.java | 194 ++++++++++++++++++- .../util/FiniteStormFileSpout.java | 2 +- .../util/FiniteStormInMemorySpout.java | 2 +- .../split/SplitBoltTopology.java | 4 +- 40 files changed, 1565 insertions(+), 1016 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md index f42dc24..9663fc7 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md @@ -3,7 +3,7 @@ The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example. The following Strom features are not (yet/fully) supported by the compatibility layer right now: -* topology and tuple meta information (ie, `TopologyContext` not fully supported) +* tuple meta information * no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored) * for whole Storm topologies the following is not supported by Flink: * direct emit connection pattern http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java deleted file mode 100644 index e2f6332..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.api; - -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -import java.util.HashMap; -import java.util.List; - -/** - * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a - * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt - * bolt}.<br /> - * <br /> - * <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore, - * direct emit is not supported.</strong> - */ -final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer { - - /** the declared output streams and schemas */ - final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>(); - - @Override - public void declare(final Fields fields) { - this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields); - } - - /** - * {@inheritDoc} - * <p/> - * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}. - * - * @throws UnsupportedOperationException - * if {@code direct} is {@code true} - */ - @Override - public void declare(final boolean direct, final Fields fields) { - this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields); - } - - @Override - public void declareStream(final String streamId, final Fields fields) { - this.declareStream(streamId, false, fields); - } - - /** - * {@inheritDoc} - * <p/> - * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}. - * - * @throws UnsupportedOperationException - * if {@code direct} is {@code true} - */ - @Override - public void declareStream(final String streamId, final boolean direct, final Fields fields) { - if (direct) { - throw new UnsupportedOperationException("Direct emit is not supported by Flink"); - } - - this.outputStreams.put(streamId, fields); - } - - /** - * Returns {@link TypeInformation} for the declared output schema for a specific stream. - * - * @param streamId - * A stream ID. - * - * @return output type information for the declared output schema of the specified stream; or {@code null} if - * {@code streamId == null} - * - * @throws IllegalArgumentException - * If no output schema was declared for the specified stream or if more then 25 attributes got declared. - */ - public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException { - if (streamId == null) { - return null; - } - - Fields outputSchema = this.outputStreams.get(streamId); - if (outputSchema == null) { - throw new IllegalArgumentException("Stream with ID '" + streamId - + "' was not declared."); - } - - Tuple t; - final int numberOfAttributes = outputSchema.size(); - - if (numberOfAttributes == 1) { - return TypeExtractor.getForClass(Object.class); - } else if (numberOfAttributes <= 25) { - try { - t = Tuple.getTupleClass(numberOfAttributes).newInstance(); - } catch (final InstantiationException e) { - throw new RuntimeException(e); - } catch (final IllegalAccessException e) { - throw new RuntimeException(e); - } - } else { - throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes"); - } - - // TODO: declare only key fields as DefaultComparable - for (int i = 0; i < numberOfAttributes; ++i) { - t.setField(new DefaultComparable(), i); - } - - return TypeExtractor.getForObject(t); - } - - /** - * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link - * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable, - * Flink cannot use them and will throw an exception. - */ - private static class DefaultComparable implements Comparable<DefaultComparable> { - - public DefaultComparable() { - } - - @Override - public int compareTo(final DefaultComparable o) { - return 0; - } - } - - /** - * Computes the indexes within the declared output schema of the specified stream, for a list of given - * field-grouping attributes. - * - * @param streamId - * A stream ID. - * @param groupingFields - * The names of the key fields. - * - * @return array of {@code int}s that contains the index within the output schema for each attribute in the given - * list - */ - public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) { - final int[] fieldIndexes = new int[groupingFields.size()]; - - for (int i = 0; i < fieldIndexes.length; ++i) { - fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i)); - } - - return fieldIndexes; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java index 8c75a2c..179466e 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java @@ -28,17 +28,14 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or * {@link FlinkClient}. */ -class FlinkTopology extends StreamExecutionEnvironment { +public class FlinkTopology extends StreamExecutionEnvironment { - /** The corresponding {@link StormTopology} that is mimicked by this {@link FlinkTopology} */ - private final StormTopology stormTopology; /** The number of declared tasks for the whole program (ie, sum over all dops) */ private int numberOfTasks = 0; - public FlinkTopology(final StormTopology stormTopology) { + public FlinkTopology() { // Set default parallelism to 1, to mirror Storm default behavior super.setParallelism(1); - this.stormTopology = stormTopology; } /** @@ -52,7 +49,7 @@ class FlinkTopology extends StreamExecutionEnvironment { public JobExecutionResult execute() throws Exception { throw new UnsupportedOperationException( "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " + - "instead."); + "instead."); } /** @@ -66,12 +63,7 @@ class FlinkTopology extends StreamExecutionEnvironment { public JobExecutionResult execute(final String jobName) throws Exception { throw new UnsupportedOperationException( "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " + - "instead."); - } - - //TODO - public String getStormTopologyAsString() { - return this.stormTopology.toString(); + "instead."); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java index e4f6c94..d62d56b 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java @@ -16,7 +16,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.stormcompatibility.api; import backtype.storm.generated.ComponentCommon; @@ -35,10 +34,12 @@ import backtype.storm.tuple.Fields; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.stormcompatibility.util.FiniteStormSpout; +import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer; import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector; import org.apache.flink.stormcompatibility.util.SplitStreamType; +import org.apache.flink.stormcompatibility.util.SplitStreamTypeKeySelector; import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper; -import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout; import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper; import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper; import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper; @@ -73,14 +74,18 @@ public class FlinkTopologyBuilder { private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>(); /** All spouts&bolts declarers by their ID */ private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>(); + // needs to be a class member for internal testing purpose + private StormTopology stormTopology; + /** * Creates a Flink program that uses the specified spouts and bolts. */ @SuppressWarnings({"rawtypes", "unchecked"}) public FlinkTopology createTopology() { - final StormTopology stormTopology = this.stormBuilder.createTopology(); - final FlinkTopology env = new FlinkTopology(stormTopology); + this.stormTopology = this.stormBuilder.createTopology(); + + final FlinkTopology env = new FlinkTopology(); env.setParallelism(1); final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>(); @@ -102,6 +107,7 @@ public class FlinkTopologyBuilder { } else { spoutWrapper = new StormSpoutWrapper(userSpout); } + spoutWrapper.setStormTopology(stormTopology); DataStreamSource source; HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>(); @@ -126,6 +132,8 @@ public class FlinkTopologyBuilder { if (common.is_set_parallelism_hint()) { dop = common.get_parallelism_hint(); source.setParallelism(dop); + } else { + common.set_parallelism_hint(1); } env.increaseNumberOfTasks(dop); } @@ -217,6 +225,7 @@ public class FlinkTopologyBuilder { } SingleOutputStreamOperator outputStream; + StormBoltWrapper boltWrapper; if (boltOutputStreams.size() < 2) { // single output stream or sink String outputStreamId = null; if (boltOutputStreams.size() == 1) { @@ -225,11 +234,9 @@ public class FlinkTopologyBuilder { final TypeInformation<?> outType = declarer .getOutputType(outputStreamId); - outputStream = inputStream.transform( - boltId, - outType, - new StormBoltWrapper(userBolt, this.outputStreams.get( - producerId).get(inputStreamId))); + boltWrapper = new StormBoltWrapper(userBolt, this.outputStreams + .get(producerId).get(inputStreamId)); + outputStream = inputStream.transform(boltId, outType, boltWrapper); if (outType != null) { // only for non-sink nodes @@ -241,11 +248,8 @@ public class FlinkTopologyBuilder { final TypeInformation<?> outType = TypeExtractor .getForClass(SplitStreamType.class); - outputStream = inputStream.transform( - boltId, - outType, - new StormBoltWrapper(userBolt, this.outputStreams.get( - producerId).get(inputStreamId))); + boltWrapper = new StormBoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId)); + outputStream = inputStream.transform(boltId, outType, boltWrapper); SplitStream splitStreams = outputStream .split(new FlinkStormStreamSelector()); @@ -256,11 +260,14 @@ public class FlinkTopologyBuilder { } availableInputs.put(boltId, op); } + boltWrapper.setStormTopology(stormTopology); int dop = 1; if (common.is_set_parallelism_hint()) { dop = common.get_parallelism_hint(); outputStream.setParallelism(dop); + } else { + common.set_parallelism_hint(1); } env.increaseNumberOfTasks(dop); @@ -393,4 +400,8 @@ public class FlinkTopologyBuilder { * } */ + // for internal testing purpose only + StormTopology getStormTopology() { + return this.stormTopology; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java deleted file mode 100644 index a761617..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.api; - -import backtype.storm.generated.StormTopology; -import backtype.storm.hooks.ITaskHook; -import backtype.storm.metric.api.CombinedMetric; -import backtype.storm.metric.api.ICombiner; -import backtype.storm.metric.api.IMetric; -import backtype.storm.metric.api.IReducer; -import backtype.storm.metric.api.ReducedMetric; -import backtype.storm.state.ISubscribedState; -import backtype.storm.task.TopologyContext; - -import java.util.Collection; -import java.util.Map; - -/** - * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when - * a Storm topology is executed within Flink. - */ -public class FlinkTopologyContext extends TopologyContext { - - /** - * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated - * for each parallel task - * - * @param topology - * The Storm topology that is currently executed - * @param taskToComponents - * A map from task IDs to Component IDs - * @param taskId - * The ID of the task the context belongs to. - */ - public FlinkTopologyContext(final StormTopology topology, final Map<Integer, String> taskToComponents, - final Integer taskId) { - super(topology, null, taskToComponents, null, null, null, null, null, taskId, null, null, null, null, null, - null, null); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public void addTaskHook(final ITaskHook hook) { - throw new UnsupportedOperationException("Task hooks are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public Collection<ITaskHook> getHooks() { - throw new UnsupportedOperationException("Task hooks are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public IMetric getRegisteredMetricByName(final String name) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); - - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @SuppressWarnings("rawtypes") - @Override - public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @SuppressWarnings("rawtypes") - @Override - public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @SuppressWarnings("unchecked") - @Override - public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { - throw new UnsupportedOperationException("Metrics are not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public <T extends ISubscribedState> T setAllSubscribedState(final T obj) { - throw new UnsupportedOperationException("Not supported by Flink"); - - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) { - throw new UnsupportedOperationException("Not supported by Flink"); - } - - /** - * Not supported by Flink. - * - * @throws UnsupportedOperationException - * at every invocation - */ - @Override - public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T - obj) { - throw new UnsupportedOperationException("Not supported by Flink"); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java new file mode 100644 index 0000000..114fa7c --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.util; + +import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper; + +import backtype.storm.topology.IRichSpout; + +/** + * This interface represents a Storm spout that emits a finite number of records. Common Storm + * spouts emit infinite streams by default. To change this behaviour and take advantage of + * Flink's finite-source capabilities, the spout should implement this interface. To wrap + * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}. + */ +public interface FiniteStormSpout extends IRichSpout { + + /** + * When returns true, the spout has reached the end of the stream. + * + * @return true, if the spout's stream reached its end, false otherwise + */ + public boolean reachedEnd(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java new file mode 100644 index 0000000..3eee8d6 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.util; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import java.util.HashMap; +import java.util.List; + +/** + * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a + * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt bolt}.<br /> + * <br /> + * <strong>CAUTION: Flink does not support direct emit.</strong> + */ +public final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer { + + /** The declared output streams and schemas. */ + public final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>(); + + @Override + public void declare(final Fields fields) { + this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields); + } + + /** + * {@inheritDoc} + * <p/> + * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}. + * + * @throws UnsupportedOperationException + * if {@code direct} is {@code true} + */ + @Override + public void declare(final boolean direct, final Fields fields) { + this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields); + } + + @Override + public void declareStream(final String streamId, final Fields fields) { + this.declareStream(streamId, false, fields); + } + + /** + * {@inheritDoc} + * <p/> + * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}. + * + * @throws UnsupportedOperationException + * if {@code direct} is {@code true} + */ + @Override + public void declareStream(final String streamId, final boolean direct, final Fields fields) { + if (direct) { + throw new UnsupportedOperationException("Direct emit is not supported by Flink"); + } + + this.outputStreams.put(streamId, fields); + } + + /** + * Returns {@link TypeInformation} for the declared output schema for a specific stream. + * + * @param streamId + * A stream ID. + * + * @return output type information for the declared output schema of the specified stream; or {@code null} if + * {@code streamId == null} + * + * @throws IllegalArgumentException + * If no output schema was declared for the specified stream or if more then 25 attributes got declared. + */ + public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException { + if (streamId == null) { + return null; + } + + Fields outputSchema = this.outputStreams.get(streamId); + if (outputSchema == null) { + throw new IllegalArgumentException("Stream with ID '" + streamId + + "' was not declared."); + } + + Tuple t; + final int numberOfAttributes = outputSchema.size(); + + if (numberOfAttributes == 1) { + return TypeExtractor.getForClass(Object.class); + } else if (numberOfAttributes <= 25) { + try { + t = Tuple.getTupleClass(numberOfAttributes).newInstance(); + } catch (final InstantiationException e) { + throw new RuntimeException(e); + } catch (final IllegalAccessException e) { + throw new RuntimeException(e); + } + } else { + throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes"); + } + + // TODO: declare only key fields as DefaultComparable + for (int i = 0; i < numberOfAttributes; ++i) { + t.setField(new DefaultComparable(), i); + } + + return TypeExtractor.getForObject(t); + } + + /** + * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link + * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable, + * Flink cannot use them and will throw an exception. + */ + private static class DefaultComparable implements Comparable<DefaultComparable> { + + public DefaultComparable() { + } + + @Override + public int compareTo(final DefaultComparable o) { + return 0; + } + } + + /** + * Computes the indexes within the declared output schema of the specified stream, for a list of given + * field-grouping attributes. + * + * @param streamId + * A stream ID. + * @param groupingFields + * The names of the key fields. + * + * @return array of {@code int}s that contains the index within the output schema for each attribute in the given + * list + */ + public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) { + final int[] fieldIndexes = new int[groupingFields.size()]; + + for (int i = 0; i < fieldIndexes.length; ++i) { + fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i)); + } + + return fieldIndexes; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java index 7ca45d6..7e60a87 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java @@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector; /** * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink. */ -final public class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> { +public final class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> { private static final long serialVersionUID = 2553423379715401023L; /** internal cache to avoid short living ArrayList objects. */ http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java new file mode 100644 index 0000000..14af830 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.util; + +import backtype.storm.generated.StormTopology; +import backtype.storm.hooks.ITaskHook; +import backtype.storm.metric.api.CombinedMetric; +import backtype.storm.metric.api.ICombiner; +import backtype.storm.metric.api.IMetric; +import backtype.storm.metric.api.IReducer; +import backtype.storm.metric.api.ReducedMetric; +import backtype.storm.state.ISubscribedState; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Fields; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import clojure.lang.Atom; + +/** + * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when + * a Storm topology is executed within Flink. + */ +public final class FlinkTopologyContext extends TopologyContext { + + /** + * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated + * for each parallel task + */ + public FlinkTopologyContext(final StormTopology topology, @SuppressWarnings("rawtypes") final Map stormConf, + final Map<Integer, String> taskToComponent, final Map<String, List<Integer>> componentToSortedTasks, + final Map<String, Map<String, Fields>> componentToStreamToFields, final String stormId, final String codeDir, + final String pidDir, final Integer taskId, final Integer workerPort, final List<Integer> workerTasks, + final Map<String, Object> defaultResources, final Map<String, Object> userResources, + final Map<String, Object> executorData, @SuppressWarnings("rawtypes") final Map registeredMetrics, + final Atom openOrPrepareWasCalled) { + super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, + codeDir, pidDir, taskId, workerPort, workerTasks, defaultResources, userResources, executorData, + registeredMetrics, openOrPrepareWasCalled); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public void addTaskHook(final ITaskHook hook) { + throw new UnsupportedOperationException("Task hooks are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public Collection<ITaskHook> getHooks() { + throw new UnsupportedOperationException("Task hooks are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public IMetric getRegisteredMetricByName(final String name) { + throw new UnsupportedOperationException("Metrics are not supported by Flink"); + + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @SuppressWarnings("rawtypes") + @Override + public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) { + throw new UnsupportedOperationException("Metrics are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @SuppressWarnings("rawtypes") + @Override + public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) { + throw new UnsupportedOperationException("Metrics are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @SuppressWarnings("unchecked") + @Override + public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { + throw new UnsupportedOperationException("Metrics are not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public <T extends ISubscribedState> T setAllSubscribedState(final T obj) { + throw new UnsupportedOperationException("Not supported by Flink"); + + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) { + throw new UnsupportedOperationException("Not supported by Flink"); + } + + /** + * Not supported by Flink. + * + * @throws UnsupportedOperationException + * at every invocation + */ + @Override + public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T + obj) { + throw new UnsupportedOperationException("Not supported by Flink"); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java new file mode 100644 index 0000000..200f772 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.stormcompatibility.util; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.streaming.util.keys.KeySelectorUtil; +import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector; + +/** + * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via + * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams. + * + * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular + * {@link ArrayKeySelector} on it. + */ +public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> { + private static final long serialVersionUID = 4672434660037669254L; + + private final ArrayKeySelector<Tuple> selector; + + public SplitStreamTypeKeySelector(int... fields) { + this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields); + } + + @Override + public Tuple getKey(SplitStreamType<Tuple> value) throws Exception { + return selector.getKey(value.value); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java index c531580..ccd29bb 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java @@ -20,6 +20,7 @@ package org.apache.flink.stormcompatibility.wrappers; import java.util.Collection; import java.util.HashMap; +import backtype.storm.generated.StormTopology; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.topology.IRichSpout; @@ -43,22 +44,16 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> { private static final long serialVersionUID = 4993283609095408765L; - /** - * Number of attributes of the bolt's output tuples per stream. - */ + /** Number of attributes of the bolt's output tuples per stream. */ private final HashMap<String, Integer> numberOfAttributes; - /** - * The wrapped Storm {@link IRichSpout spout}. - */ + /** The wrapped Storm {@link IRichSpout spout}. */ protected final IRichSpout spout; - /** - * The wrapper of the given Flink collector. - */ + /** The wrapper of the given Flink collector. */ protected StormSpoutCollector<OUT> collector; - /** - * Indicates, if the source is still running or was canceled. - */ + /** Indicates, if the source is still running or was canceled. */ protected volatile boolean isRunning = true; + /** The original Storm topology. */ + protected StormTopology stormTopology; /** * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such @@ -98,6 +93,16 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs); } + /** + * Sets the original Storm topology. + * + * @param stormTopology + * The original Storm topology. + */ + public void setStormTopology(StormTopology stormTopology) { + this.stormTopology = stormTopology; + } + @Override public final void run(final SourceContext<OUT> ctx) throws Exception { this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx); @@ -114,8 +119,11 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF } this.spout.open(stormConfig, - StormWrapperSetupHelper - .convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true), + StormWrapperSetupHelper.createTopologyContext( + (StreamingRuntimeContext) super.getRuntimeContext(), + this.spout, + this.stormTopology, + null), new SpoutOutputCollector(this.collector)); this.spout.activate(); this.execute(); http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java deleted file mode 100644 index 58a4f7a..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import backtype.storm.topology.IRichSpout; - -/** - * This interface represents a Storm spout that emits a finite number of records. Common Storm - * spouts emit infinite streams by default. To change this behaviour and take advantage of - * Flink's finite-source capabilities, the spout should implement this interface. To wrap - * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}. - */ -public interface FiniteStormSpout extends IRichSpout { - - /** - * When returns true, the spout has reached the end of the stream. - * - * @return true, if the spout's stream reached its end, false otherwise - */ - public boolean reachedEnd(); - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java index 1912afc..f499ecc 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java @@ -22,6 +22,7 @@ import java.util.Collection; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.stormcompatibility.util.FiniteStormSpout; import com.google.common.collect.Sets; http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java new file mode 100644 index 0000000..3cd27d4 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.stormcompatibility.wrappers; + +import java.util.HashMap; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; + +/** + * {@link SetupOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and + * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)} + * method. + */ +class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer { + + /** The number of attributes for each declared stream by the wrapped operator. */ + HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>(); + + @Override + public void declare(final Fields fields) { + this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields); + } + + @Override + public void declare(final boolean direct, final Fields fields) { + this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields); + } + + @Override + public void declareStream(final String streamId, final Fields fields) { + this.declareStream(streamId, false, fields); + } + + @Override + public void declareStream(final String streamId, final boolean direct, final Fields fields) { + if (streamId == null) { + throw new IllegalArgumentException("Stream ID cannot be null."); + } + if (direct) { + throw new UnsupportedOperationException("Direct emit is not supported by Flink"); + } + + this.outputSchemas.put(streamId, fields.size()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java index 6b58b0a..715d6df 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java @@ -19,6 +19,7 @@ package org.apache.flink.stormcompatibility.wrappers; import java.util.Collection; import java.util.HashMap; +import backtype.storm.generated.StormTopology; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; @@ -59,6 +60,8 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple private final HashMap<String, Integer> numberOfAttributes; /** The schema (ie, ordered field names) of the input stream. */ private final Fields inputSchema; + /** The original Storm topology. */ + protected StormTopology stormTopology; /** * We have to use this because Operators must output @@ -193,12 +196,22 @@ public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> imple this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs); } + /** + * Sets the original Storm topology. + * + * @param stormTopology + * The original Storm topology. + */ + public void setStormTopology(StormTopology stormTopology) { + this.stormTopology = stormTopology; + } + @Override public void open(final Configuration parameters) throws Exception { super.open(parameters); - final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext( - super.runtimeContext, false); + final TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext( + super.runtimeContext, this.bolt, this.stormTopology, null); flinkCollector = new TimestampedCollector<OUT>(output); OutputCollector stormCollector = null; http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java deleted file mode 100644 index f33d4d3..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.wrappers; - -import java.util.HashMap; - -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -/** - * {@link StormOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and - * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)} - * method. - */ -class StormOutputFieldsDeclarer implements OutputFieldsDeclarer { - - /** The number of attributes for each declared stream by the wrapped operator. */ - HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>(); - - @Override - public void declare(final Fields fields) { - this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields); - } - - @Override - public void declare(final boolean direct, final Fields fields) { - this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields); - } - - @Override - public void declareStream(final String streamId, final Fields fields) { - this.declareStream(streamId, false, fields); - } - - @Override - public void declareStream(final String streamId, final boolean direct, final Fields fields) { - if (streamId == null) { - throw new IllegalArgumentException("Stream ID cannot be null."); - } - if (direct) { - throw new UnsupportedOperationException("Direct emit is not supported by Flink"); - } - - this.outputSchemas.put(streamId, fields.size()); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java index 75ab8e0..891497e 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java @@ -14,32 +14,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.flink.stormcompatibility.wrappers; +import backtype.storm.Config; import backtype.storm.generated.Bolt; import backtype.storm.generated.ComponentCommon; import backtype.storm.generated.SpoutSpec; +import backtype.storm.generated.StateSpoutSpec; import backtype.storm.generated.StormTopology; +import backtype.storm.generated.StreamInfo; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IComponent; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.IRichSpout; +import backtype.storm.tuple.Fields; -import org.apache.flink.stormcompatibility.api.FlinkTopologyContext; +import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer; +import org.apache.flink.stormcompatibility.util.FlinkTopologyContext; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; +import clojure.lang.Atom; + +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; /** - * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} or + * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} and * {@link StormBoltWrapper}. */ class StormWrapperSetupHelper { + /** The configuration key for the topology name. */ + final static String TOPOLOGY_NAME = "storm.topology.name"; + /** * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link StormBoltWrapper} * per declared output stream. The number is {@code -1} for raw output type or a value within range [0;25] for @@ -60,7 +71,7 @@ class StormWrapperSetupHelper { public static HashMap<String, Integer> getNumberOfAttributes(final IComponent spoutOrBolt, final Collection<String> rawOutputs) throws IllegalArgumentException { - final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer(); + final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer(); spoutOrBolt.declareOutputFields(declarer); for (Entry<String, Integer> schema : declarer.outputSchemas.entrySet()) { @@ -84,27 +95,174 @@ class StormWrapperSetupHelper { return declarer.outputSchemas; } - // TODO - public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context, - final boolean spoutOrBolt) { - final Integer taskId = new Integer(1 + context.getIndexOfThisSubtask()); + /** Used to computed unique task IDs for a Storm topology. */ + private static int tid; + + /** + * Creates a {@link TopologyContext} for a Spout or Bolt instance (ie, Flink task / Storm executor). + * + * @param context + * The Flink runtime context. + * @param spoutOrBolt + * The Spout or Bolt this context is created for. + * @param stormTopology + * The original Storm topology. + * @param stormConfig + * The user provided configuration. + * @return The created {@link TopologyContext}. + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static synchronized TopologyContext createTopologyContext( + final StreamingRuntimeContext context, final IComponent spoutOrBolt, + StormTopology stormTopology, Map stormConfig) { + String operatorName = context.getTaskName(); + if (operatorName.startsWith("Source: ")) { + // prefix "Source: " is inserted by Flink sources by default -- need to get rid of it here + operatorName = operatorName.substring(8); + } + final int dop = context.getNumberOfParallelSubtasks(); final Map<Integer, String> taskToComponents = new HashMap<Integer, String>(); - taskToComponents.put(taskId, context.getTaskName()); + final Map<String, List<Integer>> componentToSortedTasks = new HashMap<String, List<Integer>>(); + final Map<String, Map<String, Fields>> componentToStreamToFields = new HashMap<String, Map<String, Fields>>(); + String stormId = (String) stormConfig.get(TOPOLOGY_NAME); + String codeDir = null; // not supported + String pidDir = null; // not supported + Integer taskId = null; + Integer workerPort = null; // not supported + List<Integer> workerTasks = new ArrayList<Integer>(); + final Map<String, Object> defaultResources = new HashMap<String, Object>(); + final Map<String, Object> userResources = new HashMap<String, Object>(); + final Map<String, Object> executorData = new HashMap<String, Object>(); + final Map registeredMetrics = new HashMap(); + Atom openOrPrepareWasCalled = null; - final ComponentCommon common = new ComponentCommon(); - common.set_parallelism_hint(context.getNumberOfParallelSubtasks()); + if (stormTopology == null) { + // embedded mode + ComponentCommon common = new ComponentCommon(); + common.set_parallelism_hint(dop); - final Map<String, Bolt> bolts = new HashMap<String, Bolt>(); - final Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>(); + HashMap<String, SpoutSpec> spouts = new HashMap<String, SpoutSpec>(); + HashMap<String, Bolt> bolts = new HashMap<String, Bolt>(); + if (spoutOrBolt instanceof IRichSpout) { + spouts.put(operatorName, new SpoutSpec(null, common)); + } else { + assert (spoutOrBolt instanceof IRichBolt); + bolts.put(operatorName, new Bolt(null, common)); + } + stormTopology = new StormTopology(spouts, bolts, new HashMap<String, StateSpoutSpec>()); - if (spoutOrBolt) { - spoutSpecs.put(context.getTaskName(), new SpoutSpec(null, common)); + taskId = context.getIndexOfThisSubtask(); + + List<Integer> sortedTasks = new ArrayList<Integer>(dop); + for (int i = 1; i <= dop; ++i) { + taskToComponents.put(i, operatorName); + sortedTasks.add(i); + } + componentToSortedTasks.put(operatorName, sortedTasks); + + FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); + spoutOrBolt.declareOutputFields(declarer); + componentToStreamToFields.put(operatorName, declarer.outputStreams); } else { - bolts.put(context.getTaskName(), new Bolt(null, common)); + // whole topology is built (ie, FlinkTopologyBuilder is used) + Map<String, SpoutSpec> spouts = stormTopology.get_spouts(); + Map<String, Bolt> bolts = stormTopology.get_bolts(); + Map<String, StateSpoutSpec> stateSpouts = stormTopology.get_state_spouts(); + + tid = 1; + + for (Entry<String, SpoutSpec> spout : spouts.entrySet()) { + Integer rc = processSingleOperator(spout.getKey(), spout.getValue().get_common(), + operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents, + componentToSortedTasks, componentToStreamToFields); + if (rc != null) { + taskId = rc; + } + } + for (Entry<String, Bolt> bolt : bolts.entrySet()) { + Integer rc = processSingleOperator(bolt.getKey(), bolt.getValue().get_common(), + operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents, + componentToSortedTasks, componentToStreamToFields); + if (rc != null) { + taskId = rc; + } + } + for (Entry<String, StateSpoutSpec> stateSpout : stateSpouts.entrySet()) { + Integer rc = taskId = processSingleOperator(stateSpout.getKey(), stateSpout + .getValue().get_common(), operatorName, context.getIndexOfThisSubtask(), + dop, taskToComponents, componentToSortedTasks, componentToStreamToFields); + if (rc != null) { + taskId = rc; + } + } + assert (taskId != null); + } + + if (!stormConfig.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) { + stormConfig.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30); // Storm default value + } + + return new FlinkTopologyContext(stormTopology, stormConfig, taskToComponents, + componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, + taskId, workerPort, workerTasks, defaultResources, userResources, executorData, + registeredMetrics, openOrPrepareWasCalled); + } + + /** + * Sets up {@code taskToComponents}, {@code componentToSortedTasks}, and {@code componentToStreamToFields} for a + * single instance of a Spout or Bolt (ie, task or executor). Furthermore, is computes the unique task-id. + * + * @param componentId + * The ID of the Spout/Bolt in the topology. + * @param common + * The common operator object (that is all Spouts and Bolts have). + * @param operatorName + * The Flink operator name. + * @param index + * The index of the currently processed tasks with its operator. + * @param dop + * The parallelism of the operator. + * @param taskToComponents + * OUTPUT: A map from all task IDs of the topology to their component IDs. + * @param componentToSortedTasks + * OUTPUT: A map from all component IDs to their sorted list of corresponding task IDs. + * @param componentToStreamToFields + * OUTPUT: A map from all component IDs to there output streams and output fields. + * + * @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current + * Flink operator ({@link operatorName}) -- {@code null} otherwise. + */ + private static Integer processSingleOperator(final String componentId, + final ComponentCommon common, final String operatorName, final int index, + final int dop, final Map<Integer, String> taskToComponents, + final Map<String, List<Integer>> componentToSortedTasks, + final Map<String, Map<String, Fields>> componentToStreamToFields) { + final int parallelism_hint = common.get_parallelism_hint(); + Integer taskId = null; + + if (componentId.equals(operatorName)) { + taskId = tid + index; + } + + List<Integer> sortedTasks = new ArrayList<Integer>(dop); + for (int i = 0; i < parallelism_hint; ++i) { + taskToComponents.put(tid, componentId); + sortedTasks.add(tid); + ++tid; + } + componentToSortedTasks.put(componentId, sortedTasks); + + if (componentId.equals(operatorName)) { + } + + Map<String, Fields> outputStreams = new HashMap<String, Fields>(); + for(Entry<String, StreamInfo> outStream : common.get_streams().entrySet()) { + outputStreams.put(outStream.getKey(), new Fields(outStream.getValue().get_output_fields())); } + componentToStreamToFields.put(componentId, outputStreams); - return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId); + return taskId; } } http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java deleted file mode 100644 index 08ac60b..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.stormcompatibility.api; - -import backtype.storm.tuple.Fields; -import backtype.storm.utils.Utils; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.stormcompatibility.util.AbstractTest; -import org.junit.Assert; -import org.junit.Test; - -import java.util.LinkedList; - -public class FlinkOutputFieldsDeclarerTest extends AbstractTest { - - - - @Test - public void testNull() { - Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null)); - } - - @Test - public void testDeclare() { - for (int i = 0; i < 2; ++i) { // test case: simple / non-direct - for (int j = 1; j < 2; ++j) { // number of streams - for (int k = 0; k <= 25; ++k) { // number of attributes - this.runDeclareTest(i, j, k); - } - } - } - } - - @Test(expected = IllegalArgumentException.class) - public void testDeclareSimpleToManyAttributes() { - this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26); - } - - @Test(expected = IllegalArgumentException.class) - public void testDeclareNonDirectToManyAttributes() { - this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26); - } - - @Test(expected = IllegalArgumentException.class) - public void testDeclareDefaultStreamToManyAttributes() { - this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26); - } - - @Test(expected = IllegalArgumentException.class) - public void testDeclareFullToManyAttributes() { - this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26); - } - - private void runDeclareTest(final int testCase, final int numberOfStreams, - final int numberOfAttributes) { - final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); - - String[] streams = null; - if (numberOfStreams > 1 || r.nextBoolean()) { - streams = new String[numberOfStreams]; - for (int i = 0; i < numberOfStreams; ++i) { - streams[i] = "stream" + i; - } - } - - final String[] attributes = new String[numberOfAttributes]; - for (int i = 0; i < attributes.length; ++i) { - attributes[i] = "a" + i; - } - - switch (testCase) { - case 0: - this.declareSimple(declarer, streams, attributes); - break; - default: - this.declareNonDirect(declarer, streams, attributes); - } - - if (streams == null) { - streams = new String[] { Utils.DEFAULT_STREAM_ID }; - } - - for (String stream : streams) { - final TypeInformation<?> type = declarer.getOutputType(stream); - - if (numberOfAttributes == 1) { - Assert.assertEquals(type.getClass(), GenericTypeInfo.class); - Assert.assertEquals(type.getTypeClass(), Object.class); - } else { - Assert.assertEquals(numberOfAttributes, type.getArity()); - Assert.assertTrue(type.isTupleType()); - } - } - } - - private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams, - final String[] attributes) { - - if (streams != null) { - for (String stream : streams) { - declarer.declareStream(stream, new Fields(attributes)); - } - } else { - declarer.declare(new Fields(attributes)); - } - } - - private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams, - final String[] attributes) { - - if (streams != null) { - for (String stream : streams) { - declarer.declareStream(stream, false, new Fields(attributes)); - } - } else { - declarer.declare(false, new Fields(attributes)); - } - } - - @Test(expected = IllegalArgumentException.class) - public void testUndeclared() { - final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); - declarer.getOutputType("unknownStreamId"); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeclareDirect() { - new FlinkOutputFieldsDeclarer().declare(true, null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testDeclareDirect2() { - new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null); - } - - @Test - public void testGetGroupingFieldIndexes() { - final int numberOfAttributes = 5 + this.r.nextInt(21); - final String[] attributes = new String[numberOfAttributes]; - for (int i = 0; i < numberOfAttributes; ++i) { - attributes[i] = "a" + i; - } - - final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); - declarer.declare(new Fields(attributes)); - - final int numberOfKeys = 1 + this.r.nextInt(25); - final LinkedList<String> groupingFields = new LinkedList<String>(); - final boolean[] indexes = new boolean[numberOfAttributes]; - - for (int i = 0; i < numberOfAttributes; ++i) { - if (this.r.nextInt(26) < numberOfKeys) { - groupingFields.add(attributes[i]); - indexes[i] = true; - } else { - indexes[i] = false; - } - } - - final int[] expectedResult = new int[groupingFields.size()]; - int j = 0; - for (int i = 0; i < numberOfAttributes; ++i) { - if (indexes[i]) { - expectedResult[j++] = i; - } - } - - final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID, - groupingFields); - - Assert.assertEquals(expectedResult.length, result.length); - for (int i = 0; i < expectedResult.length; ++i) { - Assert.assertEquals(expectedResult[i], result[i]); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java deleted file mode 100644 index d214610..0000000 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.stormcompatibility.api; - -import backtype.storm.metric.api.ICombiner; -import backtype.storm.metric.api.IMetric; -import backtype.storm.metric.api.IReducer; -import org.junit.Test; - -public class FlinkTopologyContextTest { - - @Test(expected = UnsupportedOperationException.class) - public void testAddTaskHook() { - new FlinkTopologyContext(null, null, null).addTaskHook(null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testGetHooks() { - new FlinkTopologyContext(null, null, null).getHooks(); - } - - @SuppressWarnings("rawtypes") - @Test(expected = UnsupportedOperationException.class) - public void testRegisteredMetric1() { - new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner) null, 0); - } - - @SuppressWarnings("rawtypes") - @Test(expected = UnsupportedOperationException.class) - public void testRegisteredMetric2() { - new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer) null, 0); - } - - @Test(expected = UnsupportedOperationException.class) - public void testRegisteredMetric3() { - new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric) null, 0); - } - - @Test(expected = UnsupportedOperationException.class) - public void testGetRegisteredMetricByName() { - new FlinkTopologyContext(null, null, null).getRegisteredMetricByName(null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testSetAllSubscribedState() { - new FlinkTopologyContext(null, null, null).setAllSubscribedState(null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testSetSubscribedState1() { - new FlinkTopologyContext(null, null, null).setSubscribedState(null, null); - } - - @Test(expected = UnsupportedOperationException.class) - public void testSetSubscribedState2() { - new FlinkTopologyContext(null, null, null).setSubscribedState(null, null, null); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java index f179919..c98c9a3 100644 --- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java @@ -24,23 +24,23 @@ public class FlinkTopologyTest { @Test public void testDefaultParallelism() { - final FlinkTopology topology = new FlinkTopology(null); + final FlinkTopology topology = new FlinkTopology(); Assert.assertEquals(1, topology.getParallelism()); } @Test(expected = UnsupportedOperationException.class) public void testExecute() throws Exception { - new FlinkTopology(null).execute(); + new FlinkTopology().execute(); } @Test(expected = UnsupportedOperationException.class) public void testExecuteWithName() throws Exception { - new FlinkTopology(null).execute(null); + new FlinkTopology().execute(null); } @Test public void testNumberOfTasks() { - final FlinkTopology topology = new FlinkTopology(null); + final FlinkTopology topology = new FlinkTopology(); Assert.assertEquals(0, topology.getNumberOfTasks()); @@ -56,7 +56,7 @@ public class FlinkTopologyTest { @Test(expected = AssertionError.class) public void testAssert() { - new FlinkTopology(null).increaseNumberOfTasks(0); + new FlinkTopology().increaseNumberOfTasks(0); } } http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java new file mode 100644 index 0000000..f664e58 --- /dev/null +++ b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.stormcompatibility.api; + +import backtype.storm.generated.StormTopology; + +public class TestTopologyBuilder extends FlinkTopologyBuilder { + @Override + public StormTopology getStormTopology() { + return super.getStormTopology(); + } +}