http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
index 2b3b6a8..46bf929 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
@@ -17,12 +17,12 @@
  */
 package org.apache.flink.storm.split;
 
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import backtype.storm.topology.TopologyBuilder;
 import org.apache.flink.storm.split.operators.RandomSpout;
 import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
-import org.apache.flink.storm.util.OutputFormatter;
 import org.apache.flink.storm.util.BoltFileSink;
 import org.apache.flink.storm.util.BoltPrintSink;
+import org.apache.flink.storm.util.OutputFormatter;
 import org.apache.flink.storm.util.TupleOutputFormatter;
 
 public class SplitSpoutTopology {
@@ -32,8 +32,8 @@ public class SplitSpoutTopology {
        public final static String sinkId = "sink";
        private final static OutputFormatter formatter = new 
TupleOutputFormatter();
 
-       public static FlinkTopologyBuilder buildTopology() {
-               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+       public static TopologyBuilder buildTopology() {
+               final TopologyBuilder builder = new TopologyBuilder();
 
                builder.setSpout(spoutId, new RandomSpout(true, seed));
                builder.setBolt(evenVerifierId, new 
VerifyAndEnrichBolt(true)).shuffleGrouping(spoutId,

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
index e2c22f9..19d5873 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
@@ -16,10 +16,11 @@
  */
 package org.apache.flink.storm.split;
 
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
-
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class SplitStreamBoltLocal {
        public final static String topologyId = "Bolt split stream example";
@@ -35,16 +36,13 @@ public class SplitStreamBoltLocal {
                }
 
                // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
SplitBoltTopology.buildTopology();
+               final TopologyBuilder builder = 
SplitBoltTopology.buildTopology();
 
-               // execute program locally
                final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               cluster.submitTopology(topologyId, null, 
builder.createTopology());
+               cluster.submitTopology(topologyId, null, 
FlinkTopology.createTopology(builder));
 
-               Utils.sleep(5 * 1000);
+               Utils.sleep(10 * 1000);
 
-               // TODO kill does no do anything so far
-               cluster.killTopology(topologyId);
                cluster.shutdown();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
index 2070f66..4ab9d8a 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
@@ -16,10 +16,11 @@
  */
 package org.apache.flink.storm.split;
 
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
-
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class SplitStreamSpoutLocal {
        public final static String topologyId = "Spout split stream example";
@@ -35,16 +36,13 @@ public class SplitStreamSpoutLocal {
                }
 
                // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
SplitSpoutTopology.buildTopology();
+               final TopologyBuilder builder = 
SplitSpoutTopology.buildTopology();
 
-               // execute program locally
                final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               cluster.submitTopology(topologyId, null, 
builder.createTopology());
+               cluster.submitTopology(topologyId, null, 
FlinkTopology.createTopology(builder));
 
-               Utils.sleep(5 * 1000);
+               Utils.sleep(10 * 1000);
 
-               // TODO kill does no do anything so far
-               cluster.killTopology(topologyId);
                cluster.shutdown();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index 2421324..77a35d0 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -17,17 +17,18 @@
  */
 package org.apache.flink.storm.tests;
 
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 
 import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
 import org.apache.flink.storm.tests.operators.TaskIdBolt;
 import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 
-public class StormFieldsGroupingITCase extends StormTestBase {
+public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
 
        private final static String topologyId = "FieldsGrouping Test";
        private final static String spoutId = "spout";
@@ -52,7 +53,7 @@ public class StormFieldsGroupingITCase extends StormTestBase {
                final String[] tokens = this.resultPath.split(":");
                final String outputFile = tokens[tokens.length - 1];
 
-               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+               final TopologyBuilder builder = new TopologyBuilder();
 
                builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
                builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
@@ -60,7 +61,7 @@ public class StormFieldsGroupingITCase extends StormTestBase {
                builder.setBolt(sinkId, new 
BoltFileSink(outputFile)).shuffleGrouping(boltId);
 
                final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               cluster.submitTopology(topologyId, null, 
builder.createTopology());
+               cluster.submitTopology(topologyId, null, 
FlinkTopology.createTopology(builder));
 
                Utils.sleep(10 * 1000);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
index 6c5bea2..b69dde7 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/operators/TaskIdBolt.java
@@ -27,6 +27,9 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
+/**
+ * Bolt to prepend all incoming tuple values with the task id.
+ */
 public class TaskIdBolt extends BaseRichBolt {
        private static final long serialVersionUID = -7966475984592762720L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java
deleted file mode 100644
index 32dac7b..0000000
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.test.util.AbstractTestBase;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-/**
- * Base class for Storm tests.
- */
-public abstract class StormTestBase extends AbstractTestBase {
-       
-       public static final int DEFAULT_PARALLELISM = 4;
-       
-       public StormTestBase() {
-               this(new Configuration());
-       }
-       
-       public StormTestBase(Configuration config) {
-               super(config, StreamingMode.STREAMING);
-               setTaskManagerNumSlots(DEFAULT_PARALLELISM);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Methods to create the test program and for pre- and post- test work
-       // 
------------------------------------------------------------------------
-
-       protected abstract void testProgram() throws Exception;
-
-       protected void preSubmit() throws Exception {}
-
-       protected void postSubmit() throws Exception {}
-
-       // 
------------------------------------------------------------------------
-       //  Test entry point
-       // 
------------------------------------------------------------------------
-
-       @Test
-       public void testJob() throws Exception {
-               try {
-                       // pre-submit
-                       try {
-                               preSubmit();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               fail("Pre-submit work caused an error: " + 
e.getMessage());
-                       }
-
-                       // prepare the test environment
-                       startCluster();
-
-                       // we need to initialize the stream test environment, 
and the storm local cluster
-                       TestStreamEnvironment.setAsContext(this.executor, 
DEFAULT_PARALLELISM);
-                       
-                       FlinkLocalCluster.initialize(new 
FlinkLocalCluster.LocalClusterFactory() {
-                               @Override
-                               public FlinkLocalCluster createLocalCluster() {
-                                       return new FlinkLocalCluster(executor);
-                               }
-                       });
-
-                       // call the test program
-                       try {
-                               testProgram();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               fail("Error while calling the test program: " + 
e.getMessage());
-                       }
-
-                       // post-submit
-                       try {
-                               postSubmit();
-                       }
-                       catch (Exception e) {
-                               System.err.println(e.getMessage());
-                               e.printStackTrace();
-                               fail("Post-submit work caused an error: " + 
e.getMessage());
-                       }
-               }
-               finally {
-                       // reset the FlinkLocalCluster to its default behavior
-                       FlinkLocalCluster.initialize(new 
FlinkLocalCluster.DefaultLocalClusterFactory());
-                       
-                       // reset the StreamExecutionEnvironment to its default 
behavior
-                       TestStreamEnvironment.unsetAsContext();
-                       
-                       // clean up all resources
-                       stopCluster();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
index 62d23ab..f48e2f6 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.storm.util.StormTestBase;
-import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class BoltTokenizerWordCountITCase extends StormTestBase {
+public class BoltTokenizerWordCountITCase extends StreamingProgramTestBase {
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
index 009bdc2..902cacf 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.storm.util.StormTestBase;
-import org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojo;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class BoltTokenizerWordCountPojoITCase extends StormTestBase {
+public class BoltTokenizerWordCountPojoITCase extends StreamingProgramTestBase 
{
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
index 321015b..160efb3 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.storm.util.StormTestBase;
-import org.apache.flink.storm.wordcount.BoltTokenizerWordCountWithNames;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class BoltTokenizerWordCountWithNamesITCase extends StormTestBase {
+public class BoltTokenizerWordCountWithNamesITCase extends 
StreamingProgramTestBase {
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
index 0cff211..17f5be5 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.storm.util.StormTestBase;
-import org.apache.flink.storm.wordcount.SpoutSourceWordCount;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class SpoutSourceWordCountITCase extends StormTestBase {
+public class SpoutSourceWordCountITCase extends StreamingProgramTestBase {
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
index 39e7a25..7081207 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.storm.util.StormTestBase;
-import org.apache.flink.storm.wordcount.WordCountLocal;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class WordCountLocalITCase extends StormTestBase {
+public class WordCountLocalITCase extends StreamingProgramTestBase {
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
index 78acfe5..b04faa5 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.storm.wordcount;
 
-import org.apache.flink.storm.util.StormTestBase;
-import org.apache.flink.storm.wordcount.WordCountLocalByName;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class WordCountLocalNamedITCase extends StormTestBase {
+
+public class WordCountLocalNamedITCase extends StreamingProgramTestBase {
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties 
b/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
index 0b686e5..881dc06 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
+++ 
b/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
@@ -24,4 +24,4 @@ log4j.appender.A1=org.apache.log4j.ConsoleAppender
 
 # A1 uses PatternLayout.
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index c311f6c..fa7ae79 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -183,10 +183,10 @@ public class FlinkClient {
 
                /* set storm configuration */
                if (this.conf != null) {
-                       topology.getConfig().setGlobalJobParameters(new 
StormConfig(this.conf));
+                       
topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new 
StormConfig(this.conf));
                }
 
-               final StreamGraph streamGraph = topology.getStreamGraph();
+               final StreamGraph streamGraph = 
topology.getExecutionEnvironment().getStreamGraph();
                streamGraph.setJobName(name);
 
                final JobGraph jobGraph = streamGraph.getJobGraph();

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
index 944c6cd..00e1d03 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -26,6 +26,7 @@ import backtype.storm.generated.StormTopology;
 import backtype.storm.generated.SubmitOptions;
 import backtype.storm.generated.TopologyInfo;
 
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.StreamingMode;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -48,12 +49,10 @@ public class FlinkLocalCluster {
        private static final Logger LOG = 
LoggerFactory.getLogger(FlinkLocalCluster.class);
 
        /** The flink mini cluster on which to execute the programs */
-       private final FlinkMiniCluster flink;
+       private FlinkMiniCluster flink;
 
 
        public FlinkLocalCluster() {
-               this.flink = new LocalFlinkMiniCluster(new Configuration(), 
true, StreamingMode.STREAMING);
-               this.flink.start();
        }
 
        public FlinkLocalCluster(FlinkMiniCluster flink) {
@@ -71,13 +70,26 @@ public class FlinkLocalCluster {
                LOG.info("Running Storm topology on FlinkLocalCluster");
 
                if(conf != null) {
-                       topology.getConfig().setGlobalJobParameters(new 
StormConfig(conf));
+                       
topology.getExecutionEnvironment().getConfig().setGlobalJobParameters(new 
StormConfig(conf));
                }
 
-               StreamGraph streamGraph = topology.getStreamGraph();
+               StreamGraph streamGraph = 
topology.getExecutionEnvironment().getStreamGraph();
                streamGraph.setJobName(topologyName);
 
                JobGraph jobGraph = streamGraph.getJobGraph();
+
+               if (flink == null) {
+
+                       Configuration configuration = new Configuration();
+                       configuration.addAll(jobGraph.getJobConfiguration());
+
+                       
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
+                       
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
jobGraph.getMaximumParallelism());
+
+                       flink = new LocalFlinkMiniCluster(configuration, true, 
StreamingMode.STREAMING);
+                       this.flink.start();
+               }
+
                this.flink.submitJobDetached(jobGraph);
        }
 
@@ -99,6 +111,7 @@ public class FlinkLocalCluster {
 
        public void shutdown() {
                flink.stop();
+               flink = null;
        }
 
        public String getTopologyConf(final String id) {

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
index febd56d..7c8d1ec 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
@@ -20,7 +20,6 @@ package org.apache.flink.storm.api;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.typeutils.TypeExtractor;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
index 531d6df..7d55463 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
@@ -1,4 +1,5 @@
 /*
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,75 +16,473 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.storm.api;
 
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamMapper;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in 
terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
+ * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.</strong>
  */
-public class FlinkTopology extends StreamExecutionEnvironment {
+public class FlinkTopology {
+
+       /** All declared streams and output schemas by operator ID */
+       private final HashMap<String, HashMap<String, Fields>> outputStreams = 
new HashMap<String, HashMap<String, Fields>>();
+       /** All spouts&bolts declarers by their ID */
+       private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = 
new HashMap<String, FlinkOutputFieldsDeclarer>();
+
+       private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> 
unprocessdInputsPerBolt =
+                       new HashMap<String, Set<Entry<GlobalStreamId, 
Grouping>>>();
+
+       final HashMap<String, HashMap<String, DataStream<Tuple>>> 
availableInputs = new HashMap<>();
 
-       /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
-       private int numberOfTasks = 0;
+       private final TopologyBuilder builder;
 
-       public FlinkTopology() {
-               // Set default parallelism to 1, to mirror Storm default 
behavior
-               super.setParallelism(1);
+       // needs to be a class member for internal testing purpose
+       private final StormTopology stormTopology;
+
+       private final Map<String, IRichSpout> spouts;
+       private final Map<String, IRichBolt> bolts;
+
+       private final StreamExecutionEnvironment env;
+
+       private FlinkTopology(TopologyBuilder builder) {
+               this.builder = builder;
+               this.stormTopology = builder.createTopology();
+               // extract the spouts and bolts
+               this.spouts = getPrivateField("_spouts");
+               this.bolts = getPrivateField("_bolts");
+
+               this.env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // Kick off the translation immediately
+               translateTopology();
        }
 
        /**
-        * Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
-        * FlinkClient}.
         *
-        * @throws UnsupportedOperationException
-        *              at every invocation
+        * Creates a Flink program that uses the specified spouts and bolts.
+        * @param stormBuilder The Storm topology builder to use for creating 
the Flink topology.
+        * @return A {@link FlinkTopology} which contains the translated Storm 
topology and may be executed.
         */
-       @Override
-       public JobExecutionResult execute() throws Exception {
-               throw new UnsupportedOperationException(
-                               "A FlinkTopology cannot be executed directly. 
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
-                               "instead.");
+       public static FlinkTopology createTopology(TopologyBuilder 
stormBuilder) {
+               return new FlinkTopology(stormBuilder);
        }
 
        /**
-        * Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter} or {@link
-        * FlinkClient}.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
+        * Returns the underlying Flink {@link StreamExecutionEnvironment} for 
the Storm topology.
+        * @return The contextual environment (local or remote).
         */
-       @Override
-       public JobExecutionResult execute(final String jobName) throws 
Exception {
-               throw new UnsupportedOperationException(
-                               "A FlinkTopology cannot be executed directly. 
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
-                               "instead.");
+       public StreamExecutionEnvironment getExecutionEnvironment() {
+               return this.env;
        }
 
        /**
-        * Increased the number of declared tasks of this program by the given 
value.
-        *
-        * @param dop
-        *              The dop of a new operator that increases the number of 
overall tasks.
+        * Directly executes the Storm topology based on the current context 
(local when in IDE and
+        * remote when executed through ./bin/flink).
+        * @return The Flink {@link JobExecutionResult} after the execution of 
the Storm topology.
+        * @throws Exception which occurs during execution of the translated 
Storm topology.
         */
-       public void increaseNumberOfTasks(final int dop) {
-               assert (dop > 0);
-               this.numberOfTasks += dop;
+       public JobExecutionResult execute() throws Exception {
+               return env.execute();
+       }
+
+
+       @SuppressWarnings("unchecked")
+       private <T> Map<String, T> getPrivateField(String field) {
+               try {
+                       Field f = builder.getClass().getDeclaredField(field);
+                       f.setAccessible(true);
+                       return copyObject((Map<String, T>) f.get(builder));
+               } catch (NoSuchFieldException | IllegalAccessException e) {
+                       throw new RuntimeException("Couldn't get " + field + " 
from TopologyBuilder", e);
+               }
+       }
+
+       private <T> T copyObject(T object) {
+               try {
+                       return InstantiationUtil.deserializeObject(
+                                       
InstantiationUtil.serializeObject(object),
+                                       getClass().getClassLoader()
+                       );
+               } catch (IOException | ClassNotFoundException e) {
+                       throw new RuntimeException("Failed to copy object.");
+               }
        }
 
        /**
-        * Return the number or required tasks to execute this program.
-        *
-        * @return the number or required tasks to execute this program
+        * Creates a Flink program that uses the specified spouts and bolts.
         */
-       public int getNumberOfTasks() {
-               return this.numberOfTasks;
+       private void translateTopology() {
+
+               unprocessdInputsPerBolt.clear();
+               outputStreams.clear();
+               declarers.clear();
+               availableInputs.clear();
+
+               // Storm defaults to parallelism 1
+               env.setParallelism(1);
+
+               /* Translation of topology */
+
+
+               for (final Entry<String, IRichSpout> spout : spouts.entrySet()) 
{
+                       final String spoutId = spout.getKey();
+                       final IRichSpout userSpout = spout.getValue();
+
+                       final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+                       userSpout.declareOutputFields(declarer);
+                       final HashMap<String,Fields> sourceStreams = 
declarer.outputStreams;
+                       this.outputStreams.put(spoutId, sourceStreams);
+                       declarers.put(spoutId, declarer);
+
+
+                       final HashMap<String, DataStream<Tuple>> outputStreams 
= new HashMap<String, DataStream<Tuple>>();
+                       final DataStreamSource<?> source;
+
+                       if (sourceStreams.size() == 1) {
+                               final SpoutWrapper<Tuple> 
spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, 
null);
+                               
spoutWrapperSingleOutput.setStormTopology(stormTopology);
+
+                               final String outputStreamId = (String) 
sourceStreams.keySet().toArray()[0];
+
+                               DataStreamSource<Tuple> src = 
env.addSource(spoutWrapperSingleOutput, spoutId,
+                                               
declarer.getOutputType(outputStreamId));
+
+                               outputStreams.put(outputStreamId, src);
+                               source = src;
+                       } else {
+                               final SpoutWrapper<SplitStreamType<Tuple>> 
spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
+                                               userSpout, spoutId, null, null);
+                               
spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+                               @SuppressWarnings({ "unchecked", "rawtypes" })
+                               DataStreamSource<SplitStreamType<Tuple>> 
multiSource = env.addSource(
+                                               spoutWrapperMultipleOutputs, 
spoutId,
+                                               (TypeInformation) 
TypeExtractor.getForClass(SplitStreamType.class));
+
+                               SplitStream<SplitStreamType<Tuple>> splitSource 
= multiSource
+                                               .split(new 
StormStreamSelector<Tuple>());
+                               for (String streamId : sourceStreams.keySet()) {
+                                       SingleOutputStreamOperator<Tuple, ?> 
outStream = splitSource.select(streamId)
+                                                       .map(new 
SplitStreamMapper<Tuple>());
+                                       
outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
+                                       outputStreams.put(streamId, outStream);
+                               }
+                               source = multiSource;
+                       }
+                       availableInputs.put(spoutId, outputStreams);
+
+                       final ComponentCommon common = 
stormTopology.get_spouts().get(spoutId).get_common();
+                       if (common.is_set_parallelism_hint()) {
+                               int dop = common.get_parallelism_hint();
+                               source.setParallelism(dop);
+                       } else {
+                               common.set_parallelism_hint(1);
+                       }
+               }
+
+               /**
+               * 1. Connect all spout streams with bolts streams
+               * 2. Then proceed with the bolts stream already connected
+               *
+               *  Because we do not know the order in which an iterator steps 
over a set, we might process a consumer before
+               * its producer
+               * ->thus, we might need to repeat multiple times
+               */
+               boolean makeProgress = true;
+               while (bolts.size() > 0) {
+                       if (!makeProgress) {
+                               throw new RuntimeException(
+                                               "Unable to build Topology. 
Could not connect the following bolts: "
+                                                               + 
bolts.keySet());
+                       }
+                       makeProgress = false;
+
+                       final Iterator<Entry<String, IRichBolt>> boltsIterator 
= bolts.entrySet().iterator();
+                       while (boltsIterator.hasNext()) {
+
+                               final Entry<String, IRichBolt> bolt = 
boltsIterator.next();
+                               final String boltId = bolt.getKey();
+                               final IRichBolt userBolt = 
copyObject(bolt.getValue());
+
+                               final ComponentCommon common = 
stormTopology.get_bolts().get(boltId).get_common();
+
+                               Set<Entry<GlobalStreamId, Grouping>> 
unprocessedBoltInputs = unprocessdInputsPerBolt.get(boltId);
+                               if (unprocessedBoltInputs == null) {
+                                       unprocessedBoltInputs = new HashSet<>();
+                                       
unprocessedBoltInputs.addAll(common.get_inputs().entrySet());
+                                       unprocessdInputsPerBolt.put(boltId, 
unprocessedBoltInputs);
+                               }
+
+                               // check if all inputs are available
+                               final int numberOfInputs = 
unprocessedBoltInputs.size();
+                               int inputsAvailable = 0;
+                               for (Entry<GlobalStreamId, Grouping> entry : 
unprocessedBoltInputs) {
+                                       final String producerId = 
entry.getKey().get_componentId();
+                                       final String streamId = 
entry.getKey().get_streamId();
+                                       final HashMap<String, 
DataStream<Tuple>> streams = availableInputs.get(producerId);
+                                       if (streams != null && 
streams.get(streamId) != null) {
+                                               inputsAvailable++;
+                                       }
+                               }
+
+                               if (inputsAvailable != numberOfInputs) {
+                                       // traverse other bolts first until 
inputs are available
+                                       continue;
+                               } else {
+                                       makeProgress = true;
+                                       boltsIterator.remove();
+                               }
+
+                               final Map<GlobalStreamId, DataStream<Tuple>> 
inputStreams = new HashMap<>(numberOfInputs);
+
+                               for (Entry<GlobalStreamId, Grouping> input : 
unprocessedBoltInputs) {
+                                       final GlobalStreamId streamId = 
input.getKey();
+                                       final Grouping grouping = 
input.getValue();
+
+                                       final String producerId = 
streamId.get_componentId();
+
+                                       final Map<String, DataStream<Tuple>> 
producer = availableInputs.get(producerId);
+
+                                       inputStreams.put(streamId, 
processInput(boltId, userBolt, streamId, grouping, producer));
+                               }
+
+                               final Iterator<Entry<GlobalStreamId, 
DataStream<Tuple>>> iterator = inputStreams.entrySet().iterator();
+
+                               final Entry<GlobalStreamId, DataStream<Tuple>> 
firstInput = iterator.next();
+                               GlobalStreamId streamId = firstInput.getKey();
+                               DataStream<Tuple> inputStream = 
firstInput.getValue();
+
+                               final SingleOutputStreamOperator<?, ?> 
outputStream;
+
+                               switch (numberOfInputs) {
+                                       case 1:
+                                               outputStream = 
createOutput(boltId, userBolt, streamId, inputStream);
+                                               break;
+                                       case 2:
+                                               Entry<GlobalStreamId, 
DataStream<Tuple>> secondInput = iterator.next();
+                                               GlobalStreamId streamId2 = 
secondInput.getKey();
+                                               DataStream<Tuple> inputStream2 
= secondInput.getValue();
+                                               outputStream = 
createOutput(boltId, userBolt, streamId, inputStream, streamId2, inputStream2);
+                                               break;
+                                       default:
+                                               throw new 
UnsupportedOperationException("Don't know how to translate a bolt "
+                                                               + boltId + " 
with " + numberOfInputs + " inputs.");
+                               }
+
+                               if (common.is_set_parallelism_hint()) {
+                                       int dop = common.get_parallelism_hint();
+                                       outputStream.setParallelism(dop);
+                               } else {
+                                       common.set_parallelism_hint(1);
+                               }
+
+                       }
+               }
        }
 
+       private DataStream<Tuple> processInput(String boltId, IRichBolt 
userBolt,
+                                                                               
GlobalStreamId streamId, Grouping grouping,
+                                                                               
Map<String, DataStream<Tuple>> producer) {
+
+               assert (userBolt != null);
+               assert(boltId != null);
+               assert(streamId != null);
+               assert(grouping != null);
+               assert(producer != null);
+
+               final String producerId = streamId.get_componentId();
+               final String inputStreamId = streamId.get_streamId();
+
+               DataStream<Tuple> inputStream = producer.get(inputStreamId);
+
+               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+               declarers.put(boltId, declarer);
+               userBolt.declareOutputFields(declarer);
+               this.outputStreams.put(boltId, declarer.outputStreams);
+
+               // if producer was processed already
+               if (grouping.is_set_shuffle()) {
+                       // Storm uses a round-robin shuffle strategy
+                       inputStream = inputStream.rebalance();
+               } else if (grouping.is_set_fields()) {
+                       // global grouping is emulated in Storm via an empty 
fields grouping list
+                       final List<String> fields = grouping.get_fields();
+                       if (fields.size() > 0) {
+                               FlinkOutputFieldsDeclarer prodDeclarer = 
this.declarers.get(producerId);
+                               inputStream = inputStream.keyBy(prodDeclarer
+                                               
.getGroupingFieldIndexes(inputStreamId,
+                                                               
grouping.get_fields()));
+                       } else {
+                               inputStream = inputStream.global();
+                       }
+               } else if (grouping.is_set_all()) {
+                       inputStream = inputStream.broadcast();
+               } else if (!grouping.is_set_local_or_shuffle()) {
+                       throw new UnsupportedOperationException(
+                                       "Flink only supports 
(local-or-)shuffle, fields, all, and global grouping");
+               }
+
+               return inputStream;
+       }
+
+       private SingleOutputStreamOperator<?, ?> createOutput(String boltId, 
IRichBolt bolt, GlobalStreamId streamId, DataStream<Tuple> inputStream) {
+               return createOutput(boltId, bolt, streamId, inputStream, null, 
null);
+       }
+
+       private SingleOutputStreamOperator<?, ?> createOutput(String boltId, 
IRichBolt bolt,
+                                                                               
                                GlobalStreamId streamId, DataStream<Tuple> 
inputStream,
+                                                                               
                                GlobalStreamId streamId2, DataStream<Tuple> 
inputStream2) {
+               assert(boltId != null);
+               assert(streamId != null);
+               assert(inputStream != null);
+               Preconditions.checkArgument((streamId2 == null) == 
(inputStream2 == null));
+
+               String producerId = streamId.get_componentId();
+               String inputStreamId = streamId.get_streamId();
+
+               final HashMap<String, Fields> boltOutputs = 
this.outputStreams.get(boltId);
+
+               final FlinkOutputFieldsDeclarer declarer = 
declarers.get(boltId);
+
+               final SingleOutputStreamOperator<?, ?> outputStream;
+
+               if (boltOutputs.size() < 2) { // single output stream or sink
+                       String outputStreamId;
+                       if (boltOutputs.size() == 1) {
+                               outputStreamId = (String) 
boltOutputs.keySet().toArray()[0];
+                       } else {
+                               outputStreamId = null;
+                       }
+
+                       final TypeInformation<Tuple> outType = declarer
+                                       .getOutputType(outputStreamId);
+
+                       final SingleOutputStreamOperator<Tuple, ?> outStream;
+
+                       // only one input
+                       if (streamId2 == null) {
+                               BoltWrapper<Tuple, Tuple> boltWrapper = new 
BoltWrapper<>(
+                                               bolt, boltId, producerId, 
inputStreamId,
+                                               
this.outputStreams.get(producerId).get(inputStreamId), null);
+                               boltWrapper.setStormTopology(stormTopology);
+
+
+                               outStream = inputStream.transform(boltId, 
outType, boltWrapper);
+
+                       } else {
+                               String producerId2 = 
streamId2.get_componentId();
+                               String inputStreamId2 = 
streamId2.get_streamId();
+
+                               final BoltWrapperTwoInput<Tuple, Tuple, Tuple> 
boltWrapper = new BoltWrapperTwoInput<>(
+                                               bolt, boltId,
+                                       inputStreamId, inputStreamId2, 
producerId, producerId2,
+                                       
this.outputStreams.get(producerId).get(inputStreamId),
+                                               
this.outputStreams.get(producerId2).get(inputStreamId2)
+                               );
+                               boltWrapper.setStormTopology(stormTopology);
+
+                               outStream = 
inputStream.connect(inputStream2).transform(boltId, outType, boltWrapper);
+                       }
+
+                       if (outType != null) {
+                               // only for non-sink nodes
+                               final HashMap<String, DataStream<Tuple>> op = 
new HashMap<>();
+                               op.put(outputStreamId, outStream);
+                               availableInputs.put(boltId, op);
+                       }
+                       outputStream = outStream;
+               } else {
+
+                       @SuppressWarnings({ "unchecked", "rawtypes" })
+                       final TypeInformation<SplitStreamType<Tuple>> outType = 
(TypeInformation) TypeExtractor
+                                       .getForClass(SplitStreamType.class);
+
+
+                       final 
SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream;
+
+                       // only one input
+                       if (streamId2 == null) {
+                               final BoltWrapper<Tuple, 
SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<>(
+                                       bolt, boltId, inputStreamId, 
producerId, this.outputStreams.get(producerId).get(inputStreamId),
+                                       null
+                               );
+                               
boltWrapperMultipleOutputs.setStormTopology(stormTopology);
+
+                               multiStream = inputStream.transform(boltId, 
outType, boltWrapperMultipleOutputs);
+                       } else {
+                               String producerId2 = 
streamId2.get_componentId();
+                               String inputStreamId2 = 
streamId2.get_streamId();
+
+                               final BoltWrapperTwoInput<Tuple, Tuple, 
SplitStreamType<Tuple>> boltWrapper = new BoltWrapperTwoInput<>(
+                                               bolt, boltId,
+                                       inputStreamId, inputStreamId2, 
producerId, producerId2,
+                                       
this.outputStreams.get(producerId).get(inputStreamId),
+                                       
this.outputStreams.get(producerId2).get(inputStreamId2)
+                               );
+                               boltWrapper.setStormTopology(stormTopology);
+
+                               multiStream = 
inputStream.connect(inputStream2).transform(boltId, outType, boltWrapper);
+                       }
+
+                       final SplitStream<SplitStreamType<Tuple>> splitStream = 
multiStream
+                                       .split(new 
StormStreamSelector<Tuple>());
+
+                       final HashMap<String, DataStream<Tuple>> op = new 
HashMap<>();
+                       for (String outputStreamId : boltOutputs.keySet()) {
+                               op.put(outputStreamId,
+                                               
splitStream.select(outputStreamId).map(
+                                                               new 
SplitStreamMapper<Tuple>()));
+                               SingleOutputStreamOperator<Tuple, ?> outStream 
= splitStream
+                                               .select(outputStreamId).map(new 
SplitStreamMapper<Tuple>());
+                               
outStream.getTransformation().setOutputType(declarer.getOutputType(outputStreamId));
+                               op.put(outputStreamId, outStream);
+                       }
+                       availableInputs.put(boltId, op);
+                       outputStream = multiStream;
+               }
+
+               return outputStream;
+       }
+
+       // for internal testing purpose only
+       public StormTopology getStormTopology() {
+               return this.stormTopology;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
deleted file mode 100644
index 42e1d68..0000000
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
+++ /dev/null
@@ -1,421 +0,0 @@
-/*
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.storm.api;
-
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.IRichStateSpout;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.storm.util.SplitStreamMapper;
-import org.apache.flink.storm.util.SplitStreamType;
-import org.apache.flink.storm.util.StormStreamSelector;
-import org.apache.flink.storm.wrappers.BoltWrapper;
-import org.apache.flink.storm.wrappers.SpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a 
Flink program instead of a Storm
- * topology. Most methods (except {@link #createTopology()} are copied from 
the original {@link TopologyBuilder}
- * implementation to ensure equal behavior.<br>
- * <br>
- * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.</strong>
- */
-public class FlinkTopologyBuilder {
-
-       /** A Storm {@link TopologyBuilder} to build a real Storm topology */
-       private final TopologyBuilder stormBuilder = new TopologyBuilder();
-       /** All user spouts by their ID */
-       private final HashMap<String, IRichSpout> spouts = new HashMap<String, 
IRichSpout>();
-       /** All user bolts by their ID */
-       private final HashMap<String, IRichBolt> bolts = new HashMap<String, 
IRichBolt>();
-       /** All declared streams and output schemas by operator ID */
-       private final HashMap<String, HashMap<String, Fields>> outputStreams = 
new HashMap<String, HashMap<String, Fields>>();
-       /** All spouts&bolts declarers by their ID */
-       private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = 
new HashMap<String, FlinkOutputFieldsDeclarer>();
-       // needs to be a class member for internal testing purpose
-       private StormTopology stormTopology;
-
-
-       /**
-        * Creates a Flink program that uses the specified spouts and bolts.
-        */
-       public FlinkTopology createTopology() {
-               this.stormTopology = this.stormBuilder.createTopology();
-
-               final FlinkTopology env = new FlinkTopology();
-               env.setParallelism(1);
-
-               final HashMap<String, HashMap<String, DataStream<Tuple>>> 
availableInputs = new HashMap<String, HashMap<String, DataStream<Tuple>>>();
-
-               for (final Entry<String, IRichSpout> spout : 
this.spouts.entrySet()) {
-                       final String spoutId = spout.getKey();
-                       final IRichSpout userSpout = spout.getValue();
-
-                       final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
-                       userSpout.declareOutputFields(declarer);
-                       final HashMap<String,Fields> sourceStreams = 
declarer.outputStreams;
-                       this.outputStreams.put(spoutId, sourceStreams);
-                       declarers.put(spoutId, declarer);
-
-
-                       final HashMap<String, DataStream<Tuple>> outputStreams 
= new HashMap<String, DataStream<Tuple>>();
-                       final DataStreamSource<?> source;
-
-                       if (sourceStreams.size() == 1) {
-                               final SpoutWrapper<Tuple> 
spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, 
null);
-                               
spoutWrapperSingleOutput.setStormTopology(stormTopology);
-
-                               final String outputStreamId = (String) 
sourceStreams.keySet().toArray()[0];
-
-                               DataStreamSource<Tuple> src = 
env.addSource(spoutWrapperSingleOutput, spoutId,
-                                               
declarer.getOutputType(outputStreamId));
-
-                               outputStreams.put(outputStreamId, src);
-                               source = src;
-                       } else {
-                               final SpoutWrapper<SplitStreamType<Tuple>> 
spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
-                                               userSpout, spoutId, null, null);
-                               
spoutWrapperMultipleOutputs.setStormTopology(stormTopology);
-
-                               @SuppressWarnings({ "unchecked", "rawtypes" })
-                               DataStreamSource<SplitStreamType<Tuple>> 
multiSource = env.addSource(
-                                               spoutWrapperMultipleOutputs, 
spoutId,
-                                               (TypeInformation) 
TypeExtractor.getForClass(SplitStreamType.class));
-
-                               SplitStream<SplitStreamType<Tuple>> splitSource 
= multiSource
-                                               .split(new 
StormStreamSelector<Tuple>());
-                               for (String streamId : sourceStreams.keySet()) {
-                                       SingleOutputStreamOperator<Tuple, ?> 
outStream = splitSource.select(streamId)
-                                                       .map(new 
SplitStreamMapper<Tuple>());
-                                       
outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
-                                       outputStreams.put(streamId, outStream);
-                               }
-                               source = multiSource;
-                       }
-                       availableInputs.put(spoutId, outputStreams);
-
-                       int dop = 1;
-                       final ComponentCommon common = 
stormTopology.get_spouts().get(spoutId).get_common();
-                       if (common.is_set_parallelism_hint()) {
-                               dop = common.get_parallelism_hint();
-                               source.setParallelism(dop);
-                       } else {
-                               common.set_parallelism_hint(1);
-                       }
-                       env.increaseNumberOfTasks(dop);
-               }
-
-               final HashMap<String, IRichBolt> unprocessedBolts = new 
HashMap<String, IRichBolt>();
-               unprocessedBolts.putAll(this.bolts);
-
-               final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> 
unprocessdInputsPerBolt =
-                               new HashMap<String, Set<Entry<GlobalStreamId, 
Grouping>>>();
-
-               /* Because we do not know the order in which an iterator steps 
over a set, we might process a consumer before
-                * its producer
-                * ->thus, we might need to repeat multiple times
-                */
-               boolean makeProgress = true;
-               while (unprocessedBolts.size() > 0) {
-                       if (!makeProgress) {
-                               throw new RuntimeException(
-                                               "Unable to build Topology. 
Could not connect the following bolts: "
-                                                               + 
unprocessedBolts.keySet());
-                       }
-                       makeProgress = false;
-
-                       final Iterator<Entry<String, IRichBolt>> boltsIterator 
= unprocessedBolts.entrySet().iterator();
-                       while (boltsIterator.hasNext()) {
-
-                               final Entry<String, IRichBolt> bolt = 
boltsIterator.next();
-                               final String boltId = bolt.getKey();
-                               final IRichBolt userBolt = bolt.getValue();
-
-                               final ComponentCommon common = 
stormTopology.get_bolts().get(boltId).get_common();
-
-                               Set<Entry<GlobalStreamId, Grouping>> 
unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
-                               if (unprocessedInputs == null) {
-                                       unprocessedInputs = new 
HashSet<Entry<GlobalStreamId, Grouping>>();
-                                       
unprocessedInputs.addAll(common.get_inputs().entrySet());
-                                       unprocessdInputsPerBolt.put(boltId, 
unprocessedInputs);
-                               }
-
-                               // connect each available producer to the 
current bolt
-                               final Iterator<Entry<GlobalStreamId, Grouping>> 
inputStreamsIterator = unprocessedInputs.iterator();
-                               while (inputStreamsIterator.hasNext()) {
-
-                                       final Entry<GlobalStreamId, Grouping> 
stormInputStream = inputStreamsIterator.next();
-                                       final String producerId = 
stormInputStream.getKey().get_componentId();
-                                       final String inputStreamId = 
stormInputStream.getKey().get_streamId();
-
-                                       final HashMap<String, 
DataStream<Tuple>> producer = availableInputs.get(producerId);
-                                       if (producer != null) {
-                                               makeProgress = true;
-
-                                               DataStream<Tuple> inputStream = 
producer.get(inputStreamId);
-                                               if (inputStream != null) {
-                                                       final 
FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-                                                       
userBolt.declareOutputFields(declarer);
-                                                       final HashMap<String, 
Fields> boltOutputStreams = declarer.outputStreams;
-                                                       
this.outputStreams.put(boltId, boltOutputStreams);
-                                                       
this.declarers.put(boltId, declarer);
-
-                                                       // if producer was 
processed already
-                                                       final Grouping grouping 
= stormInputStream.getValue();
-                                                       if 
(grouping.is_set_shuffle()) {
-                                                               // Storm uses a 
round-robin shuffle strategy
-                                                               inputStream = 
inputStream.rebalance();
-                                                       } else if 
(grouping.is_set_fields()) {
-                                                               // global 
grouping is emulated in Storm via an empty fields grouping list
-                                                               final 
List<String> fields = grouping.get_fields();
-                                                               if 
(fields.size() > 0) {
-                                                                       
FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
-                                                                       
inputStream = inputStream.keyBy(prodDeclarer
-                                                                               
        .getGroupingFieldIndexes(inputStreamId,
-                                                                               
                        grouping.get_fields()));
-                                                               } else {
-                                                                       
inputStream = inputStream.global();
-                                                               }
-                                                       } else if 
(grouping.is_set_all()) {
-                                                               inputStream = 
inputStream.broadcast();
-                                                       } else if 
(!grouping.is_set_local_or_shuffle()) {
-                                                               throw new 
UnsupportedOperationException(
-                                                                               
"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
-                                                       }
-
-                                                       final 
SingleOutputStreamOperator<?, ?> outputStream;
-
-                                                       if 
(boltOutputStreams.size() < 2) { // single output stream or sink
-                                                               String 
outputStreamId = null;
-                                                               if 
(boltOutputStreams.size() == 1) {
-                                                                       
outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
-                                                               }
-                                                               final 
TypeInformation<Tuple> outType = declarer
-                                                                               
.getOutputType(outputStreamId);
-
-                                                               final 
BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, 
Tuple>(
-                                                                               
userBolt, boltId, this.outputStreams.get(producerId).get(
-                                                                               
                inputStreamId), null);
-                                                               
boltWrapperSingleOutput.setStormTopology(stormTopology);
-
-                                                               final 
SingleOutputStreamOperator<Tuple, ?> outStream = inputStream
-                                                                               
.transform(boltId, outType, boltWrapperSingleOutput);
-
-                                                               if (outType != 
null) {
-                                                                       // only 
for non-sink nodes
-                                                                       final 
HashMap<String, DataStream<Tuple>> op = new HashMap<String, 
DataStream<Tuple>>();
-                                                                       
op.put(outputStreamId, outStream);
-                                                                       
availableInputs.put(boltId, op);
-                                                               }
-                                                               outputStream = 
outStream;
-                                                       } else {
-                                                               final 
BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new 
BoltWrapper<Tuple, SplitStreamType<Tuple>>(
-                                                                               
userBolt, boltId, this.outputStreams.get(producerId).get(
-                                                                               
                inputStreamId), null);
-                                                               
boltWrapperMultipleOutputs.setStormTopology(stormTopology);
-
-                                                               
@SuppressWarnings({ "unchecked", "rawtypes" })
-                                                               final 
TypeInformation<SplitStreamType<Tuple>> outType = (TypeInformation) 
TypeExtractor
-                                                                               
.getForClass(SplitStreamType.class);
-
-                                                               final 
SingleOutputStreamOperator<SplitStreamType<Tuple>, ?> multiStream = inputStream
-                                                                               
.transform(boltId, outType, boltWrapperMultipleOutputs);
-
-                                                               final 
SplitStream<SplitStreamType<Tuple>> splitStream = multiStream
-                                                                               
.split(new StormStreamSelector<Tuple>());
-
-                                                               final 
HashMap<String, DataStream<Tuple>> op = new HashMap<String, 
DataStream<Tuple>>();
-                                                               for (String 
outputStreamId : boltOutputStreams.keySet()) {
-                                                                       
SingleOutputStreamOperator<Tuple, ?> outStream = splitStream
-                                                                               
        .select(outputStreamId).map(
-                                                                               
                        new SplitStreamMapper<Tuple>());
-                                                                       
outStream.getTransformation().setOutputType(
-                                                                               
        declarer.getOutputType(outputStreamId));
-                                                                       
op.put(outputStreamId, outStream);
-                                                               }
-                                                               
availableInputs.put(boltId, op);
-                                                               outputStream = 
multiStream;
-                                                       }
-
-                                                       int dop = 1;
-                                                       if 
(common.is_set_parallelism_hint()) {
-                                                               dop = 
common.get_parallelism_hint();
-                                                               
outputStream.setParallelism(dop);
-                                                       } else {
-                                                               
common.set_parallelism_hint(1);
-                                                       }
-                                                       
env.increaseNumberOfTasks(dop);
-
-                                                       
inputStreamsIterator.remove();
-                                               } else {
-                                                       throw new 
RuntimeException("Cannot connect '" + boltId + "' to '"
-                                                                       + 
producerId + "'. Stream '" + inputStreamId + "' not found.");
-                                               }
-                                       }
-                               }
-
-                               if (unprocessedInputs.size() == 0) {
-                                       // all inputs are connected; processing 
bolt completed
-                                       boltsIterator.remove();
-                               }
-                       }
-               }
-               return env;
-       }
-
-       /**
-        * Define a new bolt in this topology with parallelism of just one 
thread.
-        *
-        * @param id
-        *              the id of this component. This id is referenced by 
other components that want to consume this bolt's
-        *              outputs.
-        * @param bolt
-        *              the bolt
-        * @return use the returned object to declare the inputs to this 
component
-        */
-       public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
-               return this.setBolt(id, bolt, null);
-       }
-
-       /**
-        * Define a new bolt in this topology with the specified amount of 
parallelism.
-        *
-        * @param id
-        *              the id of this component. This id is referenced by 
other components that want to consume this bolt's
-        *              outputs.
-        * @param bolt
-        *              the bolt
-        * @param parallelism_hint
-        *              the number of tasks that should be assigned to execute 
this bolt. Each task will run on a thread in a
-        *              process somewhere around the cluster.
-        * @return use the returned object to declare the inputs to this 
component
-        */
-       public BoltDeclarer setBolt(final String id, final IRichBolt bolt, 
final Number parallelism_hint) {
-               final BoltDeclarer declarer = this.stormBuilder.setBolt(id, 
bolt, parallelism_hint);
-               this.bolts.put(id, bolt);
-               return declarer;
-       }
-
-       /**
-        * Define a new bolt in this topology. This defines a basic bolt, which 
is a simpler to use but more restricted
-        * kind
-        * of bolt. Basic bolts are intended for non-aggregation processing and 
automate the anchoring/acking process to
-        * achieve proper reliability in the topology.
-        *
-        * @param id
-        *              the id of this component. This id is referenced by 
other components that want to consume this bolt's
-        *              outputs.
-        * @param bolt
-        *              the basic bolt
-        * @return use the returned object to declare the inputs to this 
component
-        */
-       public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
-               return this.setBolt(id, bolt, null);
-       }
-
-       /**
-        * Define a new bolt in this topology. This defines a basic bolt, which 
is a simpler to use but more restricted
-        * kind
-        * of bolt. Basic bolts are intended for non-aggregation processing and 
automate the anchoring/acking process to
-        * achieve proper reliability in the topology.
-        *
-        * @param id
-        *              the id of this component. This id is referenced by 
other components that want to consume this bolt's
-        *              outputs.
-        * @param bolt
-        *              the basic bolt
-        * @param parallelism_hint
-        *              the number of tasks that should be assigned to execute 
this bolt. Each task will run on a thread in a
-        *              process somewhere around the cluster.
-        * @return use the returned object to declare the inputs to this 
component
-        */
-       public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, 
final Number parallelism_hint) {
-               return this.setBolt(id, new BasicBoltExecutor(bolt), 
parallelism_hint);
-       }
-
-       /**
-        * Define a new spout in this topology.
-        *
-        * @param id
-        *              the id of this component. This id is referenced by 
other components that want to consume this spout's
-        *              outputs.
-        * @param spout
-        *              the spout
-        */
-       public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
-               return this.setSpout(id, spout, null);
-       }
-
-       /**
-        * Define a new spout in this topology with the specified parallelism. 
If the spout declares itself as
-        * non-distributed, the parallelism_hint will be ignored and only one 
task will be allocated to this component.
-        *
-        * @param id
-        *              the id of this component. This id is referenced by 
other components that want to consume this spout's
-        *              outputs.
-        * @param parallelism_hint
-        *              the number of tasks that should be assigned to execute 
this spout. Each task will run on a thread in a
-        *              process somewhere around the cluster.
-        * @param spout
-        *              the spout
-        */
-       public SpoutDeclarer setSpout(final String id, final IRichSpout spout, 
final Number parallelism_hint) {
-               final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, 
spout, parallelism_hint);
-               this.spouts.put(id, spout);
-               return declarer;
-       }
-
-       // TODO add StateSpout support (Storm 0.9.4 does not yet support 
StateSpouts itself)
-       /* not implemented by Storm 0.9.4
-        * public void setStateSpout(final String id, final IRichStateSpout 
stateSpout) {
-        * this.stormBuilder.setStateSpout(id, stateSpout);
-        * }
-        * public void setStateSpout(final String id, final IRichStateSpout 
stateSpout, final Number parallelism_hint) {
-        * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
-        * }
-        */
-
-       // for internal testing purpose only
-       StormTopology getStormTopology() {
-               return this.stormTopology;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
index 6550990..38ce58c 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
@@ -17,15 +17,14 @@
  */
 package org.apache.flink.storm.util;
 
+import backtype.storm.Config;
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-
-import backtype.storm.Config;
-
 /**
  * {@link StormConfig} is used to provide a user-defined Storm configuration 
(ie, a raw {@link Map} or {@link Config}
  * object) for embedded Spouts and Bolts.

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
index d9f4178..6072e0f 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
@@ -17,15 +17,15 @@
  */
 package org.apache.flink.storm.util;
 
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
 /**
- * Used by {@link FlinkTopologyBuilder} to split multiple declared output 
streams within Flink.
+ * Used by {@link FlinkTopology} to split multiple declared output streams 
within Flink.
  */
 public final class StormStreamSelector<T> implements 
OutputSelector<SplitStreamType<T>> {
        private static final long serialVersionUID = 2553423379715401023L;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index ee06f0a..a0115f3 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -16,15 +16,13 @@
  */
 package org.apache.flink.storm.wrappers;
 
-import java.util.Collection;
-import java.util.HashMap;
-
 import backtype.storm.generated.StormTopology;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.tuple.Fields;
-
+import backtype.storm.utils.Utils;
+import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -36,7 +34,8 @@ import 
org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
-import com.google.common.collect.Sets;
+import java.util.Collection;
+import java.util.HashMap;
 
 /**
  * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the 
Storm bolt within a Flink Streaming
@@ -53,21 +52,33 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
        private static final long serialVersionUID = -4788589118464155835L;
 
        /** The wrapped Storm {@link IRichBolt bolt}. */
-       private final IRichBolt bolt;
+       protected final IRichBolt bolt;
        /** The name of the bolt. */
        private final String name;
        /** Number of attributes of the bolt's output tuples per stream. */
-       private final HashMap<String, Integer> numberOfAttributes;
+       protected final HashMap<String, Integer> numberOfAttributes;
        /** The schema (ie, ordered field names) of the input stream. */
-       private final Fields inputSchema;
+       protected final Fields inputSchema;
        /** The original Storm topology. */
        protected StormTopology stormTopology;
 
+       /** The topology context of the bolt */
+       protected transient TopologyContext topologyContext;
+
+       /** The component id of the input stream for this bolt */
+       protected final String inputComponentId;
+
+       /** The stream id of the input stream for this bolt */
+       protected final String inputStreamId;
+
+       public final static String DEFAULT_OPERATOR_ID = "defaultID";
+       public final static String DEFUALT_BOLT_NAME = "defaultBoltName";
+
        /**
         *  We have to use this because Operators must output
         *  {@link 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
         */
-       private transient TimestampedCollector<OUT> flinkCollector;
+       protected transient TimestampedCollector<OUT> flinkCollector;
 
        /**
         * Instantiates a new {@link BoltWrapper} that wraps the given Storm 
{@link IRichBolt bolt} such that it can be
@@ -75,8 +86,7 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         * for POJO input types. The output type will be one of {@link Tuple0} 
to {@link Tuple25} depending on the bolt's
         * declared number of attributes.
         * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param bolt The Storm {@link IRichBolt bolt} to be used.
         * @throws IllegalArgumentException
         *             If the number of declared output attributes is not with 
range [0;25].
         */
@@ -89,11 +99,9 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
         * {@link Tuple0} to {@link Tuple25}. The output type will be one of 
{@link Tuple0} to {@link Tuple25} depending on
         * the bolt's declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
-        * @param inputSchema
-        *            The schema (ie, ordered field names) of the input stream.
+        * @param bolt The Storm {@link IRichBolt bolt} to be used.
+        * @param inputSchema The schema (ie, ordered field names) of the input 
stream.  @throws IllegalArgumentException
+        *
         * @throws IllegalArgumentException
         *             If the number of declared output attributes is not with 
range [0;25].
         */
@@ -108,16 +116,13 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         * for POJO input types. The output type can be any type if parameter 
{@code rawOutput} is {@code true} and the
         * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
         * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param bolt The Storm {@link IRichBolt bolt} to be used.
         * @param rawOutputs
         *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
         *            of a raw type.
         * @throws IllegalArgumentException
         *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [1;25].
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not within range [1;25].
         */
        public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
                        throws IllegalArgumentException {
@@ -131,8 +136,7 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
         * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
         * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param bolt The Storm {@link IRichBolt bolt} to be used.
         * @param rawOutputs
         *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
         *            of a raw type.
@@ -153,8 +157,7 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
         * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
         * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param bolt The Storm {@link IRichBolt bolt} to be used.
         * @param inputSchema
         *            The schema (ie, ordered field names) of the input stream.
         * @param rawOutputs
@@ -166,7 +169,7 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         *             [0;25].
         */
        public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
-                       final String[] rawOutputs) throws 
IllegalArgumentException {
+                                       final String[] rawOutputs) throws 
IllegalArgumentException {
                this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
        }
 
@@ -176,11 +179,11 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         * {@link Tuple0} to {@link Tuple25}. The output type can be any type 
if parameter {@code rawOutput} is {@code true}
         * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
         * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param bolt The Storm {@link IRichBolt bolt} to be used.
         * @param inputSchema
-        *            The schema (ie, ordered field names) of the input stream.
+        *             The schema (ie, ordered field names) of the input 
stream.   @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
         * @param rawOutputs
         *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
         *            of a raw type.
@@ -190,8 +193,8 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         *             [0;25].
         */
        public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
-                       final Collection<String> rawOutputs) throws 
IllegalArgumentException {
-               this(bolt, null, inputSchema, rawOutputs);
+                                       final Collection<String> rawOutputs) 
throws IllegalArgumentException {
+               this(bolt, DEFUALT_BOLT_NAME, Utils.DEFAULT_STREAM_ID, 
DEFAULT_OPERATOR_ID, inputSchema, rawOutputs);
        }
 
        /**
@@ -201,10 +204,10 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
         * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
         * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
-        * @param name
-        *            The name of the bolt.
+        * @param bolt The Storm {@link IRichBolt bolt} to be used.
+        * @param name The name of the bolt.
+        * @param inputStreamId The stream id of the input stream for this bolt
+        * @param inputComponentId The component id of the input stream for 
this bolt
         * @param inputSchema
         *            The schema (ie, ordered field names) of the input stream.
         * @param rawOutputs
@@ -215,10 +218,13 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
         *             [0;25].
         */
-       public BoltWrapper(final IRichBolt bolt, final String name, final 
Fields inputSchema,
-                       final Collection<String> rawOutputs) throws 
IllegalArgumentException {
+       public BoltWrapper(final IRichBolt bolt, final String name,
+                                       final String inputStreamId, final 
String inputComponentId,
+                                       final Fields inputSchema, final 
Collection<String> rawOutputs) throws IllegalArgumentException {
                this.bolt = bolt;
                this.name = name;
+               this.inputComponentId = inputComponentId;
+               this.inputStreamId = inputStreamId;
                this.inputSchema = inputSchema;
                this.numberOfAttributes = 
WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
        }
@@ -237,7 +243,7 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
        public void open() throws Exception {
                super.open();
 
-               this.flinkCollector = new TimestampedCollector<OUT>(output);
+               this.flinkCollector = new TimestampedCollector<>(output);
                final OutputCollector stormCollector = new OutputCollector(new 
BoltCollector<OUT>(
                                this.numberOfAttributes, flinkCollector));
 
@@ -252,9 +258,8 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
                        }
                }
 
-               final TopologyContext topologyContext = 
WrapperSetupHelper.createTopologyContext(
+               topologyContext = WrapperSetupHelper.createTopologyContext(
                                getRuntimeContext(), this.bolt, this.name, 
this.stormTopology, stormConfig);
-
                this.bolt.prepare(stormConfig, topologyContext, stormCollector);
        }
 
@@ -267,7 +272,7 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
        public void processElement(final StreamRecord<IN> element) throws 
Exception {
                this.flinkCollector.setTimestamp(element.getTimestamp());
                IN value = element.getValue();
-               this.bolt.execute(new StormTuple<IN>(value, inputSchema));
+               this.bolt.execute(new StormTuple<>(value, inputSchema, 
topologyContext.getThisTaskId(), inputStreamId, inputComponentId));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java
new file mode 100644
index 0000000..02ffa51
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapperTwoInput.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collection;
+
+/**
+ * A {@link BoltWrapperTwoInput} wraps an {@link IRichBolt} in order to 
execute the Storm bolt within a Flink Streaming
+ * program. In contrast to {@link BoltWrapper}, this wrapper takes two input 
stream as input.
+ */
+public class BoltWrapperTwoInput<IN1, IN2, OUT> extends BoltWrapper<IN1, OUT> 
implements TwoInputStreamOperator<IN1, IN2, OUT> {
+
+       /** The schema (ie, ordered field names) of the second input stream. */
+       private final Fields inputSchema2;
+
+       /** The component id of the second input stream of the bolt */
+       private final String componentId2;
+       /** The stream id of the second input stream of the bolt */
+       private final String streamId2;
+
+       /**
+        * Instantiates a new {@link BoltWrapperTwoInput} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
+        * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
+        * {@link Tuple0} to {@link Tuple25}. The output type can be any type 
if parameter {@code rawOutput} is {@code true}
+        * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
+        * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
+        *  @param bolt The Storm {@link IRichBolt bolt} to be used.
+        * @param boltId The name of the bolt.
+        * @param streamId1 The stream id of the second input stream for this 
bolt
+        * @param componentId2 The component id of the second input stream for 
this bolt
+        * @param inputSchema1
+        *            The schema (ie, ordered field names) of the input stream.
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        * */
+       public BoltWrapperTwoInput(final IRichBolt bolt, final String boltId,
+                                                       final String streamId1, 
final String streamId2,
+                                                       final String 
componentId1, final String componentId2,
+                                                       final Fields 
inputSchema1, final Fields inputSchema2) throws IllegalArgumentException {
+               this(bolt, boltId, streamId1, streamId2, componentId1, 
componentId2, inputSchema1, inputSchema2, null);
+       }
+
+       /**
+        * Instantiates a new {@link BoltWrapperTwoInput} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
+        * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
+        * {@link Tuple0} to {@link Tuple25}. The output type can be any type 
if parameter {@code rawOutput} is {@code true}
+        * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
+        * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
+        * @param bolt The Storm {@link IRichBolt bolt} to be used.
+        * @param boltId The name of the bolt.
+        * @param streamId1 The stream id of the first input stream for this 
bolt
+        * @param streamId2 The stream id of the first input stream for this 
bolt
+        * @param componentId1 The component id of the first input stream for 
this bolt
+        * @param componentId2 The component id of the second input stream for 
this bolt
+        * @param inputSchema1
+        *             The schema (ie, ordered field names) of the first input 
stream.
+        * @param inputSchema2
+        *             The schema (ie, ordered field names) of the second input 
stream.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type.
+        * @throws IllegalArgumentException
+        *          If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *            {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        */
+       public BoltWrapperTwoInput(final IRichBolt bolt, final String boltId,
+                                                       final String streamId1, 
final String streamId2,
+                                                       final String 
componentId1, final String componentId2,
+                                                       final Fields 
inputSchema1, final Fields inputSchema2,
+                                                       final 
Collection<String> rawOutputs) throws IllegalArgumentException {
+               super(bolt, boltId, streamId1, componentId1, inputSchema1, 
rawOutputs);
+               this.componentId2 = componentId2;
+               this.streamId2 = streamId2;
+               this.inputSchema2 = inputSchema2;
+       }
+
+       /**
+        * Sets the original Storm topology.
+        *
+        * @param stormTopology
+        *            The original Storm topology.
+        */
+       public void setStormTopology(StormTopology stormTopology) {
+               this.stormTopology = stormTopology;
+       }
+
+
+       @Override
+       public void processElement1(final StreamRecord<IN1> element) throws 
Exception {
+               super.processElement(element);
+       }
+
+       @Override
+       public void processElement2(StreamRecord<IN2> element) throws Exception 
{
+               this.flinkCollector.setTimestamp(element.getTimestamp());
+               IN2 value = element.getValue();
+               this.bolt.execute(new StormTuple<>(value, inputSchema2, 
topologyContext.getThisTaskId(), streamId2, componentId2));
+       }
+
+       @Override
+       public void processWatermark1(Watermark mark) throws Exception {
+               super.processWatermark(mark);
+       }
+
+       @Override
+       public void processWatermark2(Watermark mark) throws Exception {
+               this.output.emitWatermark(mark);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
index 68368bf..db1d147 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -27,13 +27,12 @@ import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Fields;
+import clojure.lang.Atom;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import clojure.lang.Atom;
-
 /**
  * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites 
certain method that are not applicable when
  * a Storm topology is executed within Flink.

Reply via email to