[FLINK-2566] FlinkTopologyContext not populated completely
  - extended FlinkTopologyContext to be populted with all supportable attributes
  - added JUnit test
  - updated README.md
additionally: module restructuring to get cleaner package structure

This closes #1135


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a67a60f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a67a60f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a67a60f

Branch: refs/heads/master
Commit: 7a67a60f934123863ca96a95e30471c99bb8088a
Parents: 39115ab
Author: mjsax <mj...@informatik.hu-berlin.de>
Authored: Tue Sep 15 23:59:31 2015 +0200
Committer: mjsax <mj...@informatik.hu-berlin.de>
Committed: Tue Oct 6 13:29:32 2015 +0200

----------------------------------------------------------------------
 .../flink-storm-compatibility-core/README.md    |   2 +-
 .../api/FlinkOutputFieldsDeclarer.java          | 170 ----------------
 .../stormcompatibility/api/FlinkTopology.java   |  16 +-
 .../api/FlinkTopologyBuilder.java               |  39 ++--
 .../api/FlinkTopologyContext.java               | 161 ---------------
 .../util/FiniteStormSpout.java                  |  39 ++++
 .../util/FlinkOutputFieldsDeclarer.java         | 168 ++++++++++++++++
 .../util/FlinkStormStreamSelector.java          |   2 +-
 .../util/FlinkTopologyContext.java              | 164 ++++++++++++++++
 .../util/SplitStreamTypeKeySelector.java        |  46 +++++
 .../wrappers/AbstractStormSpoutWrapper.java     |  36 ++--
 .../wrappers/FiniteStormSpout.java              |  37 ----
 .../wrappers/FiniteStormSpoutWrapper.java       |   1 +
 .../wrappers/SetupOutputFieldsDeclarer.java     |  63 ++++++
 .../wrappers/StormBoltWrapper.java              |  17 +-
 .../wrappers/StormOutputFieldsDeclarer.java     |  63 ------
 .../wrappers/StormWrapperSetupHelper.java       | 192 ++++++++++++++++--
 .../api/FlinkOutputFieldsDeclarerTest.java      | 193 ------------------
 .../api/FlinkTopologyContextTest.java           |  74 -------
 .../api/FlinkTopologyTest.java                  |  10 +-
 .../api/TestTopologyBuilder.java                |  27 +++
 .../util/FiniteTestSpout.java                   |  77 ++++++++
 .../util/FlinkOutputFieldsDeclarerTest.java     | 193 ++++++++++++++++++
 .../util/FlinkStormStreamSelectorTest.java      |  51 +++++
 .../util/FlinkTopologyContextTest.java          | 114 +++++++++++
 .../stormcompatibility/util/TestDummyBolt.java  |  20 +-
 .../stormcompatibility/util/TestDummySpout.java |  17 +-
 .../flink/stormcompatibility/util/TestSink.java |  16 +-
 .../wrappers/FiniteStormSpoutWrapperTest.java   |   6 +
 .../wrappers/FiniteTestSpout.java               |  77 --------
 .../wrappers/FlinkStormStreamSelectorTest.java  |  51 -----
 .../wrappers/SetupOutputFieldsDeclarerTest.java |  91 +++++++++
 .../wrappers/StormBoltWrapperTest.java          |  36 ++--
 .../wrappers/StormFiniteSpoutWrapperTest.java   |  13 +-
 .../wrappers/StormOutputFieldsDeclarerTest.java |  91 ---------
 .../wrappers/StormSpoutWrapperTest.java         |   6 +
 .../wrappers/StormWrapperSetupHelperTest.java   | 194 ++++++++++++++++++-
 .../util/FiniteStormFileSpout.java              |   2 +-
 .../util/FiniteStormInMemorySpout.java          |   2 +-
 .../split/SplitBoltTopology.java                |   4 +-
 40 files changed, 1565 insertions(+), 1016 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
index f42dc24..9663fc7 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/README.md
@@ -3,7 +3,7 @@
 The Storm compatibility layer allows to embed spouts or bolt unmodified within 
a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). 
Additionally, a whole Storm topology can be submitted to Flink (see 
`FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few 
minor changes to the original submitting code are required. The code that 
builds the topology itself, can be reused unmodified. See 
`flink-storm-examples` for a simple word-count example.
 
 The following Strom features are not (yet/fully) supported by the 
compatibility layer right now:
-* topology and tuple meta information (ie, `TopologyContext` not fully 
supported)
+* tuple meta information
 * no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring 
is ignored)
 * for whole Storm topologies the following is not supported by Flink:
   * direct emit connection pattern

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
deleted file mode 100644
index e2f6332..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema 
of a
- * {@link backtype.storm.topology.IRichSpout spout} or {@link 
backtype.storm.topology.IRichBolt
- * bolt}.<br />
- * <br />
- * <strong>CAUTION: Currently, Flink does only support the default output 
stream. Furthermore,
- * direct emit is not supported.</strong>
- */
-final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
-       /** the declared output streams and schemas */
-       final HashMap<String, Fields> outputStreams = new HashMap<String, 
Fields>();
-
-       @Override
-       public void declare(final Fields fields) {
-               this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
-       }
-
-       /**
-        * {@inheritDoc}
-        * <p/>
-        * Direct emit is no supported by Flink. Parameter {@code direct} must 
be {@code false}.
-        *
-        * @throws UnsupportedOperationException
-        *              if {@code direct} is {@code true}
-        */
-       @Override
-       public void declare(final boolean direct, final Fields fields) {
-               this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-       }
-
-       @Override
-       public void declareStream(final String streamId, final Fields fields) {
-               this.declareStream(streamId, false, fields);
-       }
-
-       /**
-        * {@inheritDoc}
-        * <p/>
-        * Direct emit is no supported by Flink. Parameter {@code direct} must 
be {@code false}.
-        *
-        * @throws UnsupportedOperationException
-        *              if {@code direct} is {@code true}
-        */
-       @Override
-       public void declareStream(final String streamId, final boolean direct, 
final Fields fields) {
-               if (direct) {
-                       throw new UnsupportedOperationException("Direct emit is 
not supported by Flink");
-               }
-
-               this.outputStreams.put(streamId, fields);
-       }
-
-       /**
-        * Returns {@link TypeInformation} for the declared output schema for a 
specific stream.
-        * 
-        * @param streamId
-        *            A stream ID.
-        * 
-        * @return output type information for the declared output schema of 
the specified stream; or {@code null} if
-        *         {@code streamId == null}
-        * 
-        * @throws IllegalArgumentException
-        *             If no output schema was declared for the specified 
stream or if more then 25 attributes got declared.
-        */
-       public TypeInformation<?> getOutputType(final String streamId) throws 
IllegalArgumentException {
-               if (streamId == null) {
-                       return null;
-               }
-
-               Fields outputSchema = this.outputStreams.get(streamId);
-               if (outputSchema == null) {
-                       throw new IllegalArgumentException("Stream with ID '" + 
streamId
-                                       + "' was not declared.");
-               }
-
-               Tuple t;
-               final int numberOfAttributes = outputSchema.size();
-
-               if (numberOfAttributes == 1) {
-                       return TypeExtractor.getForClass(Object.class);
-               } else if (numberOfAttributes <= 25) {
-                       try {
-                               t = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
-                       } catch (final InstantiationException e) {
-                               throw new RuntimeException(e);
-                       } catch (final IllegalAccessException e) {
-                               throw new RuntimeException(e);
-                       }
-               } else {
-                       throw new IllegalArgumentException("Flink supports only 
a maximum number of 25 attributes");
-               }
-
-               // TODO: declare only key fields as DefaultComparable
-               for (int i = 0; i < numberOfAttributes; ++i) {
-                       t.setField(new DefaultComparable(), i);
-               }
-
-               return TypeExtractor.getForObject(t);
-       }
-
-       /**
-        * {@link DefaultComparable} is a {@link Comparable} helper class that 
is used to get the correct {@link
-        * TypeInformation} from {@link TypeExtractor} within {@link 
#getOutputType()}. If key fields are not comparable,
-        * Flink cannot use them and will throw an exception.
-        */
-       private static class DefaultComparable implements 
Comparable<DefaultComparable> {
-
-               public DefaultComparable() {
-               }
-
-               @Override
-               public int compareTo(final DefaultComparable o) {
-                       return 0;
-               }
-       }
-
-       /**
-        * Computes the indexes within the declared output schema of the 
specified stream, for a list of given
-        * field-grouping attributes.
-        * 
-        * @param streamId
-        *            A stream ID.
-        * @param groupingFields
-        *            The names of the key fields.
-        * 
-        * @return array of {@code int}s that contains the index within the 
output schema for each attribute in the given
-        *         list
-        */
-       public int[] getGroupingFieldIndexes(final String streamId, final 
List<String> groupingFields) {
-               final int[] fieldIndexes = new int[groupingFields.size()];
-
-               for (int i = 0; i < fieldIndexes.length; ++i) {
-                       fieldIndexes[i] = 
this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
-               }
-
-               return fieldIndexes;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
index 8c75a2c..179466e 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
@@ -28,17 +28,14 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
  * {@link FlinkClient}.
  */
-class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology extends StreamExecutionEnvironment {
 
-       /** The corresponding {@link StormTopology} that is mimicked by this 
{@link FlinkTopology} */
-       private final StormTopology stormTopology;
        /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
        private int numberOfTasks = 0;
 
-       public FlinkTopology(final StormTopology stormTopology) {
+       public FlinkTopology() {
                // Set default parallelism to 1, to mirror Storm default 
behavior
                super.setParallelism(1);
-               this.stormTopology = stormTopology;
        }
 
        /**
@@ -52,7 +49,7 @@ class FlinkTopology extends StreamExecutionEnvironment {
        public JobExecutionResult execute() throws Exception {
                throw new UnsupportedOperationException(
                                "A FlinkTopology cannot be executed directly. 
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
-                                               "instead.");
+                               "instead.");
        }
 
        /**
@@ -66,12 +63,7 @@ class FlinkTopology extends StreamExecutionEnvironment {
        public JobExecutionResult execute(final String jobName) throws 
Exception {
                throw new UnsupportedOperationException(
                                "A FlinkTopology cannot be executed directly. 
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
-                                               "instead.");
-       }
-
-       //TODO
-       public String getStormTopologyAsString() {
-               return this.stormTopology.toString();
+                               "instead.");
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index e4f6c94..d62d56b 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -16,7 +16,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.api;
 
 import backtype.storm.generated.ComponentCommon;
@@ -35,10 +34,12 @@ import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
 import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
 import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.apache.flink.stormcompatibility.util.SplitStreamTypeKeySelector;
 import org.apache.flink.stormcompatibility.wrappers.AbstractStormSpoutWrapper;
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
 import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
 import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
 import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
@@ -73,14 +74,18 @@ public class FlinkTopologyBuilder {
        private final HashMap<String, HashMap<String, Fields>> outputStreams = 
new HashMap<String, HashMap<String, Fields>>();
        /** All spouts&bolts declarers by their ID */
        private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = 
new HashMap<String, FlinkOutputFieldsDeclarer>();
+       // needs to be a class member for internal testing purpose
+       private StormTopology stormTopology;
+
 
        /**
         * Creates a Flink program that uses the specified spouts and bolts.
         */
        @SuppressWarnings({"rawtypes", "unchecked"})
        public FlinkTopology createTopology() {
-               final StormTopology stormTopology = 
this.stormBuilder.createTopology();
-               final FlinkTopology env = new FlinkTopology(stormTopology);
+               this.stormTopology = this.stormBuilder.createTopology();
+
+               final FlinkTopology env = new FlinkTopology();
                env.setParallelism(1);
 
                final HashMap<String, HashMap<String, DataStream>> 
availableInputs = new HashMap<String, HashMap<String, DataStream>>();
@@ -102,6 +107,7 @@ public class FlinkTopologyBuilder {
                        } else {
                                spoutWrapper = new StormSpoutWrapper(userSpout);
                        }
+                       spoutWrapper.setStormTopology(stormTopology);
 
                        DataStreamSource source;
                        HashMap<String, DataStream> outputStreams = new 
HashMap<String, DataStream>();
@@ -126,6 +132,8 @@ public class FlinkTopologyBuilder {
                        if (common.is_set_parallelism_hint()) {
                                dop = common.get_parallelism_hint();
                                source.setParallelism(dop);
+                       } else {
+                               common.set_parallelism_hint(1);
                        }
                        env.increaseNumberOfTasks(dop);
                }
@@ -217,6 +225,7 @@ public class FlinkTopologyBuilder {
                                                        }
 
                                                        
SingleOutputStreamOperator outputStream;
+                                                       StormBoltWrapper 
boltWrapper;
                                                        if 
(boltOutputStreams.size() < 2) { // single output stream or sink
                                                                String 
outputStreamId = null;
                                                                if 
(boltOutputStreams.size() == 1) {
@@ -225,11 +234,9 @@ public class FlinkTopologyBuilder {
                                                                final 
TypeInformation<?> outType = declarer
                                                                                
.getOutputType(outputStreamId);
 
-                                                               outputStream = 
inputStream.transform(
-                                                                               
boltId,
-                                                                               
outType,
-                                                                               
new StormBoltWrapper(userBolt, this.outputStreams.get(
-                                                                               
                producerId).get(inputStreamId)));
+                                                               boltWrapper = 
new StormBoltWrapper(userBolt, this.outputStreams
+                                                                               
.get(producerId).get(inputStreamId));
+                                                               outputStream = 
inputStream.transform(boltId, outType, boltWrapper);
 
                                                                if (outType != 
null) {
                                                                        // only 
for non-sink nodes
@@ -241,11 +248,8 @@ public class FlinkTopologyBuilder {
                                                                final 
TypeInformation<?> outType = TypeExtractor
                                                                                
.getForClass(SplitStreamType.class);
 
-                                                               outputStream = 
inputStream.transform(
-                                                                               
boltId,
-                                                                               
outType,
-                                                                               
new StormBoltWrapper(userBolt, this.outputStreams.get(
-                                                                               
                producerId).get(inputStreamId)));
+                                                               boltWrapper = 
new StormBoltWrapper(userBolt, 
this.outputStreams.get(producerId).get(inputStreamId));
+                                                               outputStream = 
inputStream.transform(boltId, outType, boltWrapper);
 
                                                                SplitStream 
splitStreams = outputStream
                                                                                
.split(new FlinkStormStreamSelector());
@@ -256,11 +260,14 @@ public class FlinkTopologyBuilder {
                                                                }
                                                                
availableInputs.put(boltId, op);
                                                        }
+                                                       
boltWrapper.setStormTopology(stormTopology);
 
                                                        int dop = 1;
                                                        if 
(common.is_set_parallelism_hint()) {
                                                                dop = 
common.get_parallelism_hint();
                                                                
outputStream.setParallelism(dop);
+                                                       } else {
+                                                               
common.set_parallelism_hint(1);
                                                        }
                                                        
env.increaseNumberOfTasks(dop);
 
@@ -393,4 +400,8 @@ public class FlinkTopologyBuilder {
         * }
         */
 
+       // for internal testing purpose only
+       StormTopology getStormTopology() {
+               return this.stormTopology;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
deleted file mode 100644
index a761617..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.task.TopologyContext;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites 
certain method that are not applicable when
- * a Storm topology is executed within Flink.
- */
-public class FlinkTopologyContext extends TopologyContext {
-
-       /**
-        * Instantiates a new {@link FlinkTopologyContext} for a given Storm 
topology. The context object is instantiated
-        * for each parallel task
-        *
-        * @param topology
-        *              The Storm topology that is currently executed
-        * @param taskToComponents
-        *              A map from task IDs to Component IDs
-        * @param taskId
-        *              The ID of the task the context belongs to.
-        */
-       public FlinkTopologyContext(final StormTopology topology, final 
Map<Integer, String> taskToComponents,
-                       final Integer taskId) {
-               super(topology, null, taskToComponents, null, null, null, null, 
null, taskId, null, null, null, null, null,
-                               null, null);
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public void addTaskHook(final ITaskHook hook) {
-               throw new UnsupportedOperationException("Task hooks are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public Collection<ITaskHook> getHooks() {
-               throw new UnsupportedOperationException("Task hooks are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public IMetric getRegisteredMetricByName(final String name) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @SuppressWarnings("rawtypes")
-       @Override
-       public CombinedMetric registerMetric(final String name, final ICombiner 
combiner, final int timeBucketSizeInSecs) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @SuppressWarnings("rawtypes")
-       @Override
-       public ReducedMetric registerMetric(final String name, final IReducer 
combiner, final int timeBucketSizeInSecs) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @SuppressWarnings("unchecked")
-       @Override
-       public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public <T extends ISubscribedState> T setAllSubscribedState(final T 
obj) {
-               throw new UnsupportedOperationException("Not supported by 
Flink");
-
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public <T extends ISubscribedState> T setSubscribedState(final String 
componentId, final T obj) {
-               throw new UnsupportedOperationException("Not supported by 
Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public <T extends ISubscribedState> T setSubscribedState(final String 
componentId, final String streamId, final T
-                       obj) {
-               throw new UnsupportedOperationException("Not supported by 
Flink");
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
new file mode 100644
index 0000000..114fa7c
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
+
+import backtype.storm.topology.IRichSpout;
+
+/**
+ * This interface represents a Storm spout that emits a finite number of 
records. Common Storm
+ * spouts emit infinite streams by default. To change this behaviour and take 
advantage of
+ * Flink's finite-source capabilities, the spout should implement this 
interface. To wrap
+ * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
+ */
+public interface FiniteStormSpout extends IRichSpout {
+
+       /**
+        * When returns true, the spout has reached the end of the stream.
+        *
+        * @return true, if the spout's stream reached its end, false otherwise
+        */
+       public boolean reachedEnd();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
new file mode 100644
index 0000000..3eee8d6
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema 
of a
+ * {@link backtype.storm.topology.IRichSpout spout} or {@link 
backtype.storm.topology.IRichBolt bolt}.<br />
+ * <br />
+ * <strong>CAUTION: Flink does not support direct emit.</strong>
+ */
+public final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+       /** The declared output streams and schemas. */
+       public final HashMap<String, Fields> outputStreams = new 
HashMap<String, Fields>();
+
+       @Override
+       public void declare(final Fields fields) {
+               this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+       }
+
+       /**
+        * {@inheritDoc}
+        * <p/>
+        * Direct emit is no supported by Flink. Parameter {@code direct} must 
be {@code false}.
+        *
+        * @throws UnsupportedOperationException
+        *              if {@code direct} is {@code true}
+        */
+       @Override
+       public void declare(final boolean direct, final Fields fields) {
+               this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+       }
+
+       @Override
+       public void declareStream(final String streamId, final Fields fields) {
+               this.declareStream(streamId, false, fields);
+       }
+
+       /**
+        * {@inheritDoc}
+        * <p/>
+        * Direct emit is no supported by Flink. Parameter {@code direct} must 
be {@code false}.
+        *
+        * @throws UnsupportedOperationException
+        *              if {@code direct} is {@code true}
+        */
+       @Override
+       public void declareStream(final String streamId, final boolean direct, 
final Fields fields) {
+               if (direct) {
+                       throw new UnsupportedOperationException("Direct emit is 
not supported by Flink");
+               }
+
+               this.outputStreams.put(streamId, fields);
+       }
+
+       /**
+        * Returns {@link TypeInformation} for the declared output schema for a 
specific stream.
+        * 
+        * @param streamId
+        *            A stream ID.
+        * 
+        * @return output type information for the declared output schema of 
the specified stream; or {@code null} if
+        *         {@code streamId == null}
+        * 
+        * @throws IllegalArgumentException
+        *             If no output schema was declared for the specified 
stream or if more then 25 attributes got declared.
+        */
+       public TypeInformation<?> getOutputType(final String streamId) throws 
IllegalArgumentException {
+               if (streamId == null) {
+                       return null;
+               }
+
+               Fields outputSchema = this.outputStreams.get(streamId);
+               if (outputSchema == null) {
+                       throw new IllegalArgumentException("Stream with ID '" + 
streamId
+                                       + "' was not declared.");
+               }
+
+               Tuple t;
+               final int numberOfAttributes = outputSchema.size();
+
+               if (numberOfAttributes == 1) {
+                       return TypeExtractor.getForClass(Object.class);
+               } else if (numberOfAttributes <= 25) {
+                       try {
+                               t = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
+                       } catch (final InstantiationException e) {
+                               throw new RuntimeException(e);
+                       } catch (final IllegalAccessException e) {
+                               throw new RuntimeException(e);
+                       }
+               } else {
+                       throw new IllegalArgumentException("Flink supports only 
a maximum number of 25 attributes");
+               }
+
+               // TODO: declare only key fields as DefaultComparable
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       t.setField(new DefaultComparable(), i);
+               }
+
+               return TypeExtractor.getForObject(t);
+       }
+
+       /**
+        * {@link DefaultComparable} is a {@link Comparable} helper class that 
is used to get the correct {@link
+        * TypeInformation} from {@link TypeExtractor} within {@link 
#getOutputType()}. If key fields are not comparable,
+        * Flink cannot use them and will throw an exception.
+        */
+       private static class DefaultComparable implements 
Comparable<DefaultComparable> {
+
+               public DefaultComparable() {
+               }
+
+               @Override
+               public int compareTo(final DefaultComparable o) {
+                       return 0;
+               }
+       }
+
+       /**
+        * Computes the indexes within the declared output schema of the 
specified stream, for a list of given
+        * field-grouping attributes.
+        * 
+        * @param streamId
+        *            A stream ID.
+        * @param groupingFields
+        *            The names of the key fields.
+        * 
+        * @return array of {@code int}s that contains the index within the 
output schema for each attribute in the given
+        *         list
+        */
+       public int[] getGroupingFieldIndexes(final String streamId, final 
List<String> groupingFields) {
+               final int[] fieldIndexes = new int[groupingFields.size()];
+
+               for (int i = 0; i < fieldIndexes.length; ++i) {
+                       fieldIndexes[i] = 
this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
+               }
+
+               return fieldIndexes;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
index 7ca45d6..7e60a87 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
@@ -27,7 +27,7 @@ import 
org.apache.flink.streaming.api.collector.selector.OutputSelector;
 /**
  * Used by {@link FlinkTopologyBuilder} to split multiple declared output 
streams within Flink.
  */
-final public class FlinkStormStreamSelector<T> implements 
OutputSelector<SplitStreamType<T>> {
+public final class FlinkStormStreamSelector<T> implements 
OutputSelector<SplitStreamType<T>> {
        private static final long serialVersionUID = 2553423379715401023L;
 
        /** internal cache to avoid short living ArrayList objects. */

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
new file mode 100644
index 0000000..14af830
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.hooks.ITaskHook;
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+import backtype.storm.state.ISubscribedState;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import clojure.lang.Atom;
+
+/**
+ * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites 
certain method that are not applicable when
+ * a Storm topology is executed within Flink.
+ */
+public final class FlinkTopologyContext extends TopologyContext {
+
+       /**
+        * Instantiates a new {@link FlinkTopologyContext} for a given Storm 
topology. The context object is instantiated
+        * for each parallel task
+        */
+       public FlinkTopologyContext(final StormTopology topology, 
@SuppressWarnings("rawtypes") final Map stormConf,
+                       final Map<Integer, String> taskToComponent, final 
Map<String, List<Integer>> componentToSortedTasks,
+                       final Map<String, Map<String, Fields>> 
componentToStreamToFields, final String stormId, final String codeDir,
+                       final String pidDir, final Integer taskId, final 
Integer workerPort, final List<Integer> workerTasks,
+                       final Map<String, Object> defaultResources, final 
Map<String, Object> userResources,
+                       final Map<String, Object> executorData, 
@SuppressWarnings("rawtypes") final Map registeredMetrics,
+                       final Atom openOrPrepareWasCalled) {
+               super(topology, stormConf, taskToComponent, 
componentToSortedTasks, componentToStreamToFields, stormId,
+                               codeDir, pidDir, taskId, workerPort, 
workerTasks, defaultResources, userResources, executorData,
+                               registeredMetrics, openOrPrepareWasCalled);
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public void addTaskHook(final ITaskHook hook) {
+               throw new UnsupportedOperationException("Task hooks are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public Collection<ITaskHook> getHooks() {
+               throw new UnsupportedOperationException("Task hooks are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public IMetric getRegisteredMetricByName(final String name) {
+               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @SuppressWarnings("rawtypes")
+       @Override
+       public CombinedMetric registerMetric(final String name, final ICombiner 
combiner, final int timeBucketSizeInSecs) {
+               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @SuppressWarnings("rawtypes")
+       @Override
+       public ReducedMetric registerMetric(final String name, final IReducer 
combiner, final int timeBucketSizeInSecs) {
+               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @SuppressWarnings("unchecked")
+       @Override
+       public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
+               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public <T extends ISubscribedState> T setAllSubscribedState(final T 
obj) {
+               throw new UnsupportedOperationException("Not supported by 
Flink");
+
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public <T extends ISubscribedState> T setSubscribedState(final String 
componentId, final T obj) {
+               throw new UnsupportedOperationException("Not supported by 
Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public <T extends ISubscribedState> T setSubscribedState(final String 
componentId, final String streamId, final T
+                       obj) {
+               throw new UnsupportedOperationException("Not supported by 
Flink");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
new file mode 100644
index 0000000..200f772
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
+
+/**
+ * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for 
streams that are selected via
+ * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares 
multiple output streams.
+ * 
+ * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} 
tuples and applies a regular
+ * {@link ArrayKeySelector} on it.
+ */
+public class SplitStreamTypeKeySelector implements 
KeySelector<SplitStreamType<Tuple>, Tuple> {
+       private static final long serialVersionUID = 4672434660037669254L;
+
+       private final ArrayKeySelector<Tuple> selector;
+
+       public SplitStreamTypeKeySelector(int... fields) {
+               this.selector = new 
KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
+       }
+
+       @Override
+       public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
+               return selector.getKey(value.value);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index c531580..ccd29bb 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -20,6 +20,7 @@ package org.apache.flink.stormcompatibility.wrappers;
 import java.util.Collection;
 import java.util.HashMap;
 
+import backtype.storm.generated.StormTopology;
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.topology.IRichSpout;
 
@@ -43,22 +44,16 @@ import 
org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 public abstract class AbstractStormSpoutWrapper<OUT> extends 
RichParallelSourceFunction<OUT> {
        private static final long serialVersionUID = 4993283609095408765L;
 
-       /**
-        * Number of attributes of the bolt's output tuples per stream.
-        */
+       /** Number of attributes of the bolt's output tuples per stream. */
        private final HashMap<String, Integer> numberOfAttributes;
-       /**
-        * The wrapped Storm {@link IRichSpout spout}.
-        */
+       /** The wrapped Storm {@link IRichSpout spout}. */
        protected final IRichSpout spout;
-       /**
-        * The wrapper of the given Flink collector.
-        */
+       /** The wrapper of the given Flink collector. */
        protected StormSpoutCollector<OUT> collector;
-       /**
-        * Indicates, if the source is still running or was canceled.
-        */
+       /** Indicates, if the source is still running or was canceled. */
        protected volatile boolean isRunning = true;
+       /** The original Storm topology. */
+       protected StormTopology stormTopology;
 
        /**
         * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the 
given Storm {@link IRichSpout spout} such
@@ -98,6 +93,16 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends 
RichParallelSourceF
                this.numberOfAttributes = 
StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
        }
 
+       /**
+        * Sets the original Storm topology.
+        * 
+        * @param stormTopology
+        *            The original Storm topology.
+        */
+       public void setStormTopology(StormTopology stormTopology) {
+               this.stormTopology = stormTopology;
+       }
+
        @Override
        public final void run(final SourceContext<OUT> ctx) throws Exception {
                this.collector = new 
StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
@@ -114,8 +119,11 @@ public abstract class AbstractStormSpoutWrapper<OUT> 
extends RichParallelSourceF
                }
 
                this.spout.open(stormConfig,
-                               StormWrapperSetupHelper
-                               
.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), 
true),
+                               StormWrapperSetupHelper.createTopologyContext(
+                                       (StreamingRuntimeContext) 
super.getRuntimeContext(),
+                                       this.spout,
+                                       this.stormTopology,
+                                       null),
                                new SpoutOutputCollector(this.collector));
                this.spout.activate();
                this.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
deleted file mode 100644
index 58a4f7a..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpout.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- * This interface represents a Storm spout that emits a finite number of 
records. Common Storm
- * spouts emit infinite streams by default. To change this behaviour and take 
advantage of
- * Flink's finite-source capabilities, the spout should implement this 
interface. To wrap
- * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
- */
-public interface FiniteStormSpout extends IRichSpout {
-
-       /**
-        * When returns true, the spout has reached the end of the stream.
-        *
-        * @return true, if the spout's stream reached its end, false otherwise
-        */
-       public boolean reachedEnd();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
index 1912afc..f499ecc 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
 
 import com.google.common.collect.Sets;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
new file mode 100644
index 0000000..3cd27d4
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import java.util.HashMap;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+/**
+ * {@link SetupOutputFieldsDeclarer} is used by {@link 
StormWrapperSetupHelper} to determine the output streams and
+ * number of attributes declared by the wrapped spout's or bolt's {@code 
declare(...)}/{@code declareStream(...)}
+ * method.
+ */
+class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+       /** The number of attributes for each declared stream by the wrapped 
operator. */
+       HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
+
+       @Override
+       public void declare(final Fields fields) {
+               this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+       }
+
+       @Override
+       public void declare(final boolean direct, final Fields fields) {
+               this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+       }
+
+       @Override
+       public void declareStream(final String streamId, final Fields fields) {
+               this.declareStream(streamId, false, fields);
+       }
+
+       @Override
+       public void declareStream(final String streamId, final boolean direct, 
final Fields fields) {
+               if (streamId == null) {
+                       throw new IllegalArgumentException("Stream ID cannot be 
null.");
+               }
+               if (direct) {
+                       throw new UnsupportedOperationException("Direct emit is 
not supported by Flink");
+               }
+
+               this.outputSchemas.put(streamId, fields.size());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index 6b58b0a..715d6df 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -19,6 +19,7 @@ package org.apache.flink.stormcompatibility.wrappers;
 import java.util.Collection;
 import java.util.HashMap;
 
+import backtype.storm.generated.StormTopology;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
@@ -59,6 +60,8 @@ public class StormBoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> imple
        private final HashMap<String, Integer> numberOfAttributes;
        /** The schema (ie, ordered field names) of the input stream. */
        private final Fields inputSchema;
+       /** The original Storm topology. */
+       protected StormTopology stormTopology;
 
        /**
         *  We have to use this because Operators must output
@@ -193,12 +196,22 @@ public class StormBoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> imple
                this.numberOfAttributes = 
StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
        }
 
+       /**
+        * Sets the original Storm topology.
+        * 
+        * @param stormTopology
+        *            The original Storm topology.
+        */
+       public void setStormTopology(StormTopology stormTopology) {
+               this.stormTopology = stormTopology;
+       }
+
        @Override
        public void open(final Configuration parameters) throws Exception {
                super.open(parameters);
 
-               final TopologyContext topologyContext = 
StormWrapperSetupHelper.convertToTopologyContext(
-                               super.runtimeContext, false);
+               final TopologyContext topologyContext = 
StormWrapperSetupHelper.createTopologyContext(
+                               super.runtimeContext, this.bolt, 
this.stormTopology, null);
                flinkCollector = new TimestampedCollector<OUT>(output);
                OutputCollector stormCollector = null;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
deleted file mode 100644
index f33d4d3..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.HashMap;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * {@link StormOutputFieldsDeclarer} is used by {@link 
StormWrapperSetupHelper} to determine the output streams and
- * number of attributes declared by the wrapped spout's or bolt's {@code 
declare(...)}/{@code declareStream(...)}
- * method.
- */
-class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
-       /** The number of attributes for each declared stream by the wrapped 
operator. */
-       HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
-
-       @Override
-       public void declare(final Fields fields) {
-               this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
-       }
-
-       @Override
-       public void declare(final boolean direct, final Fields fields) {
-               this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-       }
-
-       @Override
-       public void declareStream(final String streamId, final Fields fields) {
-               this.declareStream(streamId, false, fields);
-       }
-
-       @Override
-       public void declareStream(final String streamId, final boolean direct, 
final Fields fields) {
-               if (streamId == null) {
-                       throw new IllegalArgumentException("Stream ID cannot be 
null.");
-               }
-               if (direct) {
-                       throw new UnsupportedOperationException("Direct emit is 
not supported by Flink");
-               }
-
-               this.outputSchemas.put(streamId, fields.size());
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
index 75ab8e0..891497e 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
@@ -14,32 +14,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.wrappers;
 
+import backtype.storm.Config;
 import backtype.storm.generated.Bolt;
 import backtype.storm.generated.ComponentCommon;
 import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.StreamInfo;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IComponent;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
 
-import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
+import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
+import org.apache.flink.stormcompatibility.util.FlinkTopologyContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
+import clojure.lang.Atom;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 /**
- * {@link StormWrapperSetupHelper} is an helper class used by {@link 
AbstractStormSpoutWrapper} or
+ * {@link StormWrapperSetupHelper} is an helper class used by {@link 
AbstractStormSpoutWrapper} and
  * {@link StormBoltWrapper}.
  */
 class StormWrapperSetupHelper {
 
+       /** The configuration key for the topology name. */
+       final static String TOPOLOGY_NAME = "storm.topology.name";
+
        /**
         * Computes the number of output attributes used by a {@link 
AbstractStormSpoutWrapper} or {@link StormBoltWrapper}
         * per declared output stream. The number is {@code -1} for raw output 
type or a value within range [0;25] for
@@ -60,7 +71,7 @@ class StormWrapperSetupHelper {
        public static HashMap<String, Integer> getNumberOfAttributes(final 
IComponent spoutOrBolt,
                        final Collection<String> rawOutputs)
                                        throws IllegalArgumentException {
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                spoutOrBolt.declareOutputFields(declarer);
 
                for (Entry<String, Integer> schema : 
declarer.outputSchemas.entrySet()) {
@@ -84,27 +95,174 @@ class StormWrapperSetupHelper {
                return declarer.outputSchemas;
        }
 
-       // TODO
-       public static TopologyContext convertToTopologyContext(final 
StreamingRuntimeContext context,
-                       final boolean spoutOrBolt) {
-               final Integer taskId = new Integer(1 + 
context.getIndexOfThisSubtask());
+       /** Used to computed unique task IDs for a Storm topology. */
+       private static int tid;
+
+       /**
+        * Creates a {@link TopologyContext} for a Spout or Bolt instance (ie, 
Flink task / Storm executor).
+        * 
+        * @param context
+        *            The Flink runtime context.
+        * @param spoutOrBolt
+        *            The Spout or Bolt this context is created for.
+        * @param stormTopology
+        *            The original Storm topology.
+        * @param stormConfig
+        *            The user provided configuration.
+        * @return The created {@link TopologyContext}.
+        */
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       public static synchronized TopologyContext createTopologyContext(
+                       final StreamingRuntimeContext context, final IComponent 
spoutOrBolt,
+                       StormTopology stormTopology, Map stormConfig) {
+               String operatorName = context.getTaskName();
+               if (operatorName.startsWith("Source: ")) {
+                       // prefix "Source: " is inserted by Flink sources by 
default -- need to get rid of it here
+                       operatorName = operatorName.substring(8);
+               }
+               final int dop = context.getNumberOfParallelSubtasks();
 
                final Map<Integer, String> taskToComponents = new 
HashMap<Integer, String>();
-               taskToComponents.put(taskId, context.getTaskName());
+               final Map<String, List<Integer>> componentToSortedTasks = new 
HashMap<String, List<Integer>>();
+               final Map<String, Map<String, Fields>> 
componentToStreamToFields = new HashMap<String, Map<String, Fields>>();
+               String stormId = (String) stormConfig.get(TOPOLOGY_NAME);
+               String codeDir = null; // not supported
+               String pidDir = null; // not supported
+               Integer taskId = null;
+               Integer workerPort = null; // not supported
+               List<Integer> workerTasks = new ArrayList<Integer>();
+               final Map<String, Object> defaultResources = new 
HashMap<String, Object>();
+               final Map<String, Object> userResources = new HashMap<String, 
Object>();
+               final Map<String, Object> executorData = new HashMap<String, 
Object>();
+               final Map registeredMetrics = new HashMap();
+               Atom openOrPrepareWasCalled = null;
 
-               final ComponentCommon common = new ComponentCommon();
-               
common.set_parallelism_hint(context.getNumberOfParallelSubtasks());
+               if (stormTopology == null) {
+                       // embedded mode
+                       ComponentCommon common = new ComponentCommon();
+                       common.set_parallelism_hint(dop);
 
-               final Map<String, Bolt> bolts = new HashMap<String, Bolt>();
-               final Map<String, SpoutSpec> spoutSpecs = new HashMap<String, 
SpoutSpec>();
+                       HashMap<String, SpoutSpec> spouts = new HashMap<String, 
SpoutSpec>();
+                       HashMap<String, Bolt> bolts = new HashMap<String, 
Bolt>();
+                       if (spoutOrBolt instanceof IRichSpout) {
+                               spouts.put(operatorName, new SpoutSpec(null, 
common));
+                       } else {
+                               assert (spoutOrBolt instanceof IRichBolt);
+                               bolts.put(operatorName, new Bolt(null, common));
+                       }
+                       stormTopology = new StormTopology(spouts, bolts, new 
HashMap<String, StateSpoutSpec>());
 
-               if (spoutOrBolt) {
-                       spoutSpecs.put(context.getTaskName(), new 
SpoutSpec(null, common));
+                       taskId = context.getIndexOfThisSubtask();
+
+                       List<Integer> sortedTasks = new ArrayList<Integer>(dop);
+                       for (int i = 1; i <= dop; ++i) {
+                               taskToComponents.put(i, operatorName);
+                               sortedTasks.add(i);
+                       }
+                       componentToSortedTasks.put(operatorName, sortedTasks);
+
+                       FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+                       spoutOrBolt.declareOutputFields(declarer);
+                       componentToStreamToFields.put(operatorName, 
declarer.outputStreams);
                } else {
-                       bolts.put(context.getTaskName(), new Bolt(null, 
common));
+                       // whole topology is built (ie, FlinkTopologyBuilder is 
used)
+                       Map<String, SpoutSpec> spouts = 
stormTopology.get_spouts();
+                       Map<String, Bolt> bolts = stormTopology.get_bolts();
+                       Map<String, StateSpoutSpec> stateSpouts = 
stormTopology.get_state_spouts();
+
+                       tid = 1;
+
+                       for (Entry<String, SpoutSpec> spout : 
spouts.entrySet()) {
+                               Integer rc = 
processSingleOperator(spout.getKey(), spout.getValue().get_common(),
+                                               operatorName, 
context.getIndexOfThisSubtask(), dop, taskToComponents,
+                                               componentToSortedTasks, 
componentToStreamToFields);
+                               if (rc != null) {
+                                       taskId = rc;
+                               }
+                       }
+                       for (Entry<String, Bolt> bolt : bolts.entrySet()) {
+                               Integer rc = 
processSingleOperator(bolt.getKey(), bolt.getValue().get_common(),
+                                               operatorName, 
context.getIndexOfThisSubtask(), dop, taskToComponents,
+                                               componentToSortedTasks, 
componentToStreamToFields);
+                               if (rc != null) {
+                                       taskId = rc;
+                               }
+                       }
+                       for (Entry<String, StateSpoutSpec> stateSpout : 
stateSpouts.entrySet()) {
+                               Integer rc = taskId = 
processSingleOperator(stateSpout.getKey(), stateSpout
+                                               .getValue().get_common(), 
operatorName, context.getIndexOfThisSubtask(),
+                                               dop, taskToComponents, 
componentToSortedTasks, componentToStreamToFields);
+                               if (rc != null) {
+                                       taskId = rc;
+                               }
+                       }
+                       assert (taskId != null);
+               }
+
+               if 
(!stormConfig.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+                       stormConfig.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 
30); // Storm default value
+               }
+
+               return new FlinkTopologyContext(stormTopology, stormConfig, 
taskToComponents,
+                               componentToSortedTasks, 
componentToStreamToFields, stormId, codeDir, pidDir,
+                               taskId, workerPort, workerTasks, 
defaultResources, userResources, executorData,
+                               registeredMetrics, openOrPrepareWasCalled);
+       }
+
+       /**
+        * Sets up {@code taskToComponents}, {@code componentToSortedTasks}, 
and {@code componentToStreamToFields} for a
+        * single instance of a Spout or Bolt (ie, task or executor). 
Furthermore, is computes the unique task-id.
+        * 
+        * @param componentId
+        *            The ID of the Spout/Bolt in the topology.
+        * @param common
+        *            The common operator object (that is all Spouts and Bolts 
have).
+        * @param operatorName
+        *            The Flink operator name.
+        * @param index
+        *            The index of the currently processed tasks with its 
operator.
+        * @param dop
+        *            The parallelism of the operator.
+        * @param taskToComponents
+        *            OUTPUT: A map from all task IDs of the topology to their 
component IDs.
+        * @param componentToSortedTasks
+        *            OUTPUT: A map from all component IDs to their sorted list 
of corresponding task IDs.
+        * @param componentToStreamToFields
+        *            OUTPUT: A map from all component IDs to there output 
streams and output fields.
+        * 
+        * @return A unique task ID if the currently processed Spout or Bolt 
({@code componentId}) is equal to the current
+        *         Flink operator ({@link operatorName}) -- {@code null} 
otherwise.
+        */
+       private static Integer processSingleOperator(final String componentId,
+                       final ComponentCommon common, final String 
operatorName, final int index,
+                       final int dop, final Map<Integer, String> 
taskToComponents,
+                       final Map<String, List<Integer>> componentToSortedTasks,
+                       final Map<String, Map<String, Fields>> 
componentToStreamToFields) {
+               final int parallelism_hint = common.get_parallelism_hint();
+               Integer taskId = null;
+
+               if (componentId.equals(operatorName)) {
+                       taskId = tid + index;
+               }
+
+               List<Integer> sortedTasks = new ArrayList<Integer>(dop);
+               for (int i = 0; i < parallelism_hint; ++i) {
+                       taskToComponents.put(tid, componentId);
+                       sortedTasks.add(tid);
+                       ++tid;
+               }
+               componentToSortedTasks.put(componentId, sortedTasks);
+
+               if (componentId.equals(operatorName)) {
+               }
+
+               Map<String, Fields> outputStreams = new HashMap<String, 
Fields>();
+               for(Entry<String, StreamInfo> outStream : 
common.get_streams().entrySet()) {
+                       outputStreams.put(outStream.getKey(), new 
Fields(outStream.getValue().get_output_fields()));
                }
+               componentToStreamToFields.put(componentId, outputStreams);
 
-               return new FlinkTopologyContext(new StormTopology(spoutSpecs, 
bolts, null), taskToComponents, taskId);
+               return taskId;
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
deleted file mode 100644
index 08ac60b..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.LinkedList;
-
-public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
-
-
-
-       @Test
-       public void testNull() {
-               Assert.assertNull(new 
FlinkOutputFieldsDeclarer().getOutputType(null));
-       }
-
-       @Test
-       public void testDeclare() {
-               for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
-                       for (int j = 1; j < 2; ++j) { // number of streams
-                               for (int k = 0; k <= 25; ++k) { // number of 
attributes
-                                       this.runDeclareTest(i, j, k);
-                               }
-                       }
-               }
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testDeclareSimpleToManyAttributes() {
-               this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testDeclareNonDirectToManyAttributes() {
-               this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testDeclareDefaultStreamToManyAttributes() {
-               this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testDeclareFullToManyAttributes() {
-               this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
-       }
-
-       private void runDeclareTest(final int testCase, final int 
numberOfStreams,
-                       final int numberOfAttributes) {
-               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
-
-               String[] streams = null;
-               if (numberOfStreams > 1 || r.nextBoolean()) {
-                       streams = new String[numberOfStreams];
-                       for (int i = 0; i < numberOfStreams; ++i) {
-                               streams[i] = "stream" + i;
-                       }
-               }
-
-               final String[] attributes = new String[numberOfAttributes];
-               for (int i = 0; i < attributes.length; ++i) {
-                       attributes[i] = "a" + i;
-               }
-
-               switch (testCase) {
-               case 0:
-                       this.declareSimple(declarer, streams, attributes);
-                       break;
-               default:
-                       this.declareNonDirect(declarer, streams, attributes);
-               }
-
-               if (streams == null) {
-                       streams = new String[] { Utils.DEFAULT_STREAM_ID };
-               }
-
-               for (String stream : streams) {
-                       final TypeInformation<?> type = 
declarer.getOutputType(stream);
-
-                       if (numberOfAttributes == 1) {
-                               Assert.assertEquals(type.getClass(), 
GenericTypeInfo.class);
-                               Assert.assertEquals(type.getTypeClass(), 
Object.class);
-                       } else {
-                               Assert.assertEquals(numberOfAttributes, 
type.getArity());
-                               Assert.assertTrue(type.isTupleType());
-                       }
-               }
-       }
-
-       private void declareSimple(final FlinkOutputFieldsDeclarer declarer, 
final String[] streams,
-                       final String[] attributes) {
-
-               if (streams != null) {
-                       for (String stream : streams) {
-                               declarer.declareStream(stream, new 
Fields(attributes));
-                       }
-               } else {
-                       declarer.declare(new Fields(attributes));
-               }
-       }
-
-       private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, 
final String[] streams,
-                       final String[] attributes) {
-
-               if (streams != null) {
-                       for (String stream : streams) {
-                               declarer.declareStream(stream, false, new 
Fields(attributes));
-                       }
-               } else {
-                       declarer.declare(false, new Fields(attributes));
-               }
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testUndeclared() {
-               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
-               declarer.getOutputType("unknownStreamId");
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testDeclareDirect() {
-               new FlinkOutputFieldsDeclarer().declare(true, null);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testDeclareDirect2() {
-               new 
FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
-       }
-
-       @Test
-       public void testGetGroupingFieldIndexes() {
-               final int numberOfAttributes = 5 + this.r.nextInt(21);
-               final String[] attributes = new String[numberOfAttributes];
-               for (int i = 0; i < numberOfAttributes; ++i) {
-                       attributes[i] = "a" + i;
-               }
-
-               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
-               declarer.declare(new Fields(attributes));
-
-               final int numberOfKeys = 1 + this.r.nextInt(25);
-               final LinkedList<String> groupingFields = new 
LinkedList<String>();
-               final boolean[] indexes = new boolean[numberOfAttributes];
-
-               for (int i = 0; i < numberOfAttributes; ++i) {
-                       if (this.r.nextInt(26) < numberOfKeys) {
-                               groupingFields.add(attributes[i]);
-                               indexes[i] = true;
-                       } else {
-                               indexes[i] = false;
-                       }
-               }
-
-               final int[] expectedResult = new int[groupingFields.size()];
-               int j = 0;
-               for (int i = 0; i < numberOfAttributes; ++i) {
-                       if (indexes[i]) {
-                               expectedResult[j++] = i;
-                       }
-               }
-
-               final int[] result = 
declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
-                               groupingFields);
-
-               Assert.assertEquals(expectedResult.length, result.length);
-               for (int i = 0; i < expectedResult.length; ++i) {
-                       Assert.assertEquals(expectedResult[i], result[i]);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
deleted file mode 100644
index d214610..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import org.junit.Test;
-
-public class FlinkTopologyContextTest {
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testAddTaskHook() {
-               new FlinkTopologyContext(null, null, null).addTaskHook(null);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testGetHooks() {
-               new FlinkTopologyContext(null, null, null).getHooks();
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Test(expected = UnsupportedOperationException.class)
-       public void testRegisteredMetric1() {
-               new FlinkTopologyContext(null, null, null).registerMetric(null, 
(ICombiner) null, 0);
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Test(expected = UnsupportedOperationException.class)
-       public void testRegisteredMetric2() {
-               new FlinkTopologyContext(null, null, null).registerMetric(null, 
(IReducer) null, 0);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testRegisteredMetric3() {
-               new FlinkTopologyContext(null, null, null).registerMetric(null, 
(IMetric) null, 0);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testGetRegisteredMetricByName() {
-               new FlinkTopologyContext(null, null, 
null).getRegisteredMetricByName(null);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testSetAllSubscribedState() {
-               new FlinkTopologyContext(null, null, 
null).setAllSubscribedState(null);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testSetSubscribedState1() {
-               new FlinkTopologyContext(null, null, 
null).setSubscribedState(null, null);
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testSetSubscribedState2() {
-               new FlinkTopologyContext(null, null, 
null).setSubscribedState(null, null, null);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
index f179919..c98c9a3 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
@@ -24,23 +24,23 @@ public class FlinkTopologyTest {
 
        @Test
        public void testDefaultParallelism() {
-               final FlinkTopology topology = new FlinkTopology(null);
+               final FlinkTopology topology = new FlinkTopology();
                Assert.assertEquals(1, topology.getParallelism());
        }
 
        @Test(expected = UnsupportedOperationException.class)
        public void testExecute() throws Exception {
-               new FlinkTopology(null).execute();
+               new FlinkTopology().execute();
        }
 
        @Test(expected = UnsupportedOperationException.class)
        public void testExecuteWithName() throws Exception {
-               new FlinkTopology(null).execute(null);
+               new FlinkTopology().execute(null);
        }
 
        @Test
        public void testNumberOfTasks() {
-               final FlinkTopology topology = new FlinkTopology(null);
+               final FlinkTopology topology = new FlinkTopology();
 
                Assert.assertEquals(0, topology.getNumberOfTasks());
 
@@ -56,7 +56,7 @@ public class FlinkTopologyTest {
 
        @Test(expected = AssertionError.class)
        public void testAssert() {
-               new FlinkTopology(null).increaseNumberOfTasks(0);
+               new FlinkTopology().increaseNumberOfTasks(0);
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
new file mode 100644
index 0000000..f664e58
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.api;
+
+import backtype.storm.generated.StormTopology;
+
+public class TestTopologyBuilder extends FlinkTopologyBuilder {
+       @Override
+       public StormTopology getStormTopology() {
+               return super.getStormTopology();
+       }
+}

Reply via email to