http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
new file mode 100644
index 0000000..1891873
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FiniteTestSpout implements IRichSpout {
+       private static final long serialVersionUID = 7992419478267824279L;
+
+       private int numberOfOutputTuples;
+       private SpoutOutputCollector collector;
+
+       public FiniteTestSpout(final int numberOfOutputTuples) {
+               this.numberOfOutputTuples = numberOfOutputTuples;
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void open(final Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
+               this.collector = collector;
+       }
+
+       @Override
+       public void close() {/* nothing to do */}
+
+       @Override
+       public void activate() {/* nothing to do */}
+
+       @Override
+       public void deactivate() {/* nothing to do */}
+
+       @Override
+       public void nextTuple() {
+               if (--this.numberOfOutputTuples >= 0) {
+                       this.collector.emit(new Values(new 
Integer(this.numberOfOutputTuples)));
+               }
+       }
+
+       @Override
+       public void ack(final Object msgId) {/* nothing to do */}
+
+       @Override
+       public void fail(final Object msgId) {/* nothing to do */}
+
+       @Override
+       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields("dummy"));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..8e63563
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.LinkedList;
+
+public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
+
+
+
+       @Test
+       public void testNull() {
+               Assert.assertNull(new 
FlinkOutputFieldsDeclarer().getOutputType(null));
+       }
+
+       @Test
+       public void testDeclare() {
+               for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
+                       for (int j = 1; j < 2; ++j) { // number of streams
+                               for (int k = 0; k <= 25; ++k) { // number of 
attributes
+                                       this.runDeclareTest(i, j, k);
+                               }
+                       }
+               }
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareSimpleToManyAttributes() {
+               this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareNonDirectToManyAttributes() {
+               this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareDefaultStreamToManyAttributes() {
+               this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareFullToManyAttributes() {
+               this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
+       }
+
+       private void runDeclareTest(final int testCase, final int 
numberOfStreams,
+                       final int numberOfAttributes) {
+               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+
+               String[] streams = null;
+               if (numberOfStreams > 1 || r.nextBoolean()) {
+                       streams = new String[numberOfStreams];
+                       for (int i = 0; i < numberOfStreams; ++i) {
+                               streams[i] = "stream" + i;
+                       }
+               }
+
+               final String[] attributes = new String[numberOfAttributes];
+               for (int i = 0; i < attributes.length; ++i) {
+                       attributes[i] = "a" + i;
+               }
+
+               switch (testCase) {
+               case 0:
+                       this.declareSimple(declarer, streams, attributes);
+                       break;
+               default:
+                       this.declareNonDirect(declarer, streams, attributes);
+               }
+
+               if (streams == null) {
+                       streams = new String[] { Utils.DEFAULT_STREAM_ID };
+               }
+
+               for (String stream : streams) {
+                       final TypeInformation<?> type = 
declarer.getOutputType(stream);
+
+                       if (numberOfAttributes == 1) {
+                               Assert.assertEquals(type.getClass(), 
GenericTypeInfo.class);
+                               Assert.assertEquals(type.getTypeClass(), 
Object.class);
+                       } else {
+                               Assert.assertEquals(numberOfAttributes, 
type.getArity());
+                               Assert.assertTrue(type.isTupleType());
+                       }
+               }
+       }
+
+       private void declareSimple(final FlinkOutputFieldsDeclarer declarer, 
final String[] streams,
+                       final String[] attributes) {
+
+               if (streams != null) {
+                       for (String stream : streams) {
+                               declarer.declareStream(stream, new 
Fields(attributes));
+                       }
+               } else {
+                       declarer.declare(new Fields(attributes));
+               }
+       }
+
+       private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, 
final String[] streams,
+                       final String[] attributes) {
+
+               if (streams != null) {
+                       for (String stream : streams) {
+                               declarer.declareStream(stream, false, new 
Fields(attributes));
+                       }
+               } else {
+                       declarer.declare(false, new Fields(attributes));
+               }
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testUndeclared() {
+               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+               declarer.getOutputType("unknownStreamId");
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testDeclareDirect() {
+               new FlinkOutputFieldsDeclarer().declare(true, null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testDeclareDirect2() {
+               new 
FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+       }
+
+       @Test
+       public void testGetGroupingFieldIndexes() {
+               final int numberOfAttributes = 5 + this.r.nextInt(21);
+               final String[] attributes = new String[numberOfAttributes];
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       attributes[i] = "a" + i;
+               }
+
+               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+               declarer.declare(new Fields(attributes));
+
+               final int numberOfKeys = 1 + this.r.nextInt(25);
+               final LinkedList<String> groupingFields = new 
LinkedList<String>();
+               final boolean[] indexes = new boolean[numberOfAttributes];
+
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       if (this.r.nextInt(26) < numberOfKeys) {
+                               groupingFields.add(attributes[i]);
+                               indexes[i] = true;
+                       } else {
+                               indexes[i] = false;
+                       }
+               }
+
+               final int[] expectedResult = new int[groupingFields.size()];
+               int j = 0;
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       if (indexes[i]) {
+                               expectedResult[j++] = i;
+                       }
+               }
+
+               final int[] result = 
declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
+                               groupingFields);
+
+               Assert.assertEquals(expectedResult.length, result.length);
+               for (int i = 0; i < expectedResult.length; ++i) {
+                       Assert.assertEquals(expectedResult[i], result[i]);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
new file mode 100644
index 0000000..c3cb7d7
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.Iterator;
+
+import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
+import org.apache.flink.stormcompatibility.util.SplitStreamType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlinkStormStreamSelectorTest {
+
+       @Test
+       public void testSelector() {
+               FlinkStormStreamSelector<Object> selector = new 
FlinkStormStreamSelector<Object>();
+               SplitStreamType<Object> tuple = new SplitStreamType<Object>();
+               Iterator<String> result;
+
+               tuple.streamId = "stream1";
+               result = selector.select(tuple).iterator();
+               Assert.assertEquals("stream1", result.next());
+               Assert.assertFalse(result.hasNext());
+
+               tuple.streamId = "stream2";
+               result = selector.select(tuple).iterator();
+               Assert.assertEquals("stream2", result.next());
+               Assert.assertFalse(result.hasNext());
+
+               tuple.streamId = "stream1";
+               result = selector.select(tuple).iterator();
+               Assert.assertEquals("stream1", result.next());
+               Assert.assertFalse(result.hasNext());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
new file mode 100644
index 0000000..bd9ea3f
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import java.util.HashMap;
+
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+
+import org.apache.flink.stormcompatibility.util.FlinkTopologyContext;
+import org.junit.Test;
+
+
+/*
+ * FlinkTopologyContext.getSources(componentId) and 
FlinkTopologyContext.getTargets(componentId) are not tested here,
+ * because those are tested in StormWrapperSetupHelperTest.
+ */
+public class FlinkTopologyContextTest extends AbstractTest {
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testAddTaskHook() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .addTaskHook(null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetHooks() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .getHooks();
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test(expected = UnsupportedOperationException.class)
+       public void testRegisteredMetric1() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .registerMetric(null, (ICombiner) null, 0);
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test(expected = UnsupportedOperationException.class)
+       public void testRegisteredMetric2() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .registerMetric(null, (IReducer) null, 0);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testRegisteredMetric3() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .registerMetric(null, (IMetric) null, 0);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetRegisteredMetricByName() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .getRegisteredMetricByName(null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testSetAllSubscribedState() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .setAllSubscribedState(null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testSetSubscribedState1() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .setSubscribedState(null, null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testSetSubscribedState2() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .setSubscribedState(null, null, null);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
index ec48719..b499373 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
 
 public class TestDummyBolt implements IRichBolt {
        private static final long serialVersionUID = 6893611247443121322L;
@@ -31,12 +32,27 @@ public class TestDummyBolt implements IRichBolt {
        public final static String shuffleStreamId = "shuffleStream";
        public final static String groupingStreamId = "groupingStream";
 
+       private boolean emit = true;
+       private TopologyContext context;
+       private OutputCollector collector;
+
        @SuppressWarnings("rawtypes")
        @Override
-       public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {}
+       public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+               this.context = context;
+               this.collector = collector;
+       }
 
        @Override
-       public void execute(Tuple input) {}
+       public void execute(Tuple input) {
+               if (this.context.getThisTaskIndex() == 0) {
+                       this.collector.emit(shuffleStreamId, input.getValues());
+               }
+               if (this.emit) {
+                       this.collector.emit(groupingStreamId, new 
Values("bolt", this.context));
+                       this.emit = false;
+               }
+       }
 
        @Override
        public void cleanup() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
index 62705b8..345ca12 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
@@ -23,6 +23,7 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
 
 public class TestDummySpout implements IRichSpout {
@@ -30,9 +31,16 @@ public class TestDummySpout implements IRichSpout {
 
        public final static String spoutStreamId = "spout-stream";
 
+       private boolean emit = true;
+       private TopologyContext context;
+       private SpoutOutputCollector collector;
+
        @SuppressWarnings("rawtypes")
        @Override
-       public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {}
+       public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+               this.context = context;
+               this.collector = collector;
+       }
 
        @Override
        public void close() {}
@@ -44,7 +52,12 @@ public class TestDummySpout implements IRichSpout {
        public void deactivate() {}
 
        @Override
-       public void nextTuple() {}
+       public void nextTuple() {
+               if (this.emit) {
+                       this.collector.emit(new Values(this.context));
+                       this.emit = false;
+               }
+       }
 
        @Override
        public void ack(Object msgId) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
index 5699219..c8e5584 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
@@ -16,6 +16,8 @@
  */
 package org.apache.flink.stormcompatibility.util;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import backtype.storm.task.OutputCollector;
@@ -27,12 +29,22 @@ import backtype.storm.tuple.Tuple;
 public class TestSink implements IRichBolt {
        private static final long serialVersionUID = 4314871456719370877L;
 
+       public final static List<TopologyContext> result = new 
LinkedList<TopologyContext>();
+
        @SuppressWarnings("rawtypes")
        @Override
-       public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {}
+       public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+               result.add(context);
+       }
 
        @Override
-       public void execute(Tuple input) {}
+       public void execute(Tuple input) {
+               if (input.size() == 1) {
+                       result.add((TopologyContext) input.getValue(0));
+               } else {
+                       result.add((TopologyContext) input.getValue(1));
+               }
+       }
 
        @Override
        public void cleanup() {}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
index 381e130..b44e8a1 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
@@ -18,6 +18,8 @@
 package org.apache.flink.stormcompatibility.wrappers;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Test;
@@ -43,6 +45,8 @@ public class FiniteStormSpoutWrapperTest {
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final FiniteStormSpoutWrapper<?> wrapper = new 
FiniteStormSpoutWrapper<Object>(stormSpout);
                wrapper.setRuntimeContext(taskContext);
@@ -59,6 +63,8 @@ public class FiniteStormSpoutWrapperTest {
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final FiniteStormSpoutWrapper<?> wrapper = new 
FiniteStormSpoutWrapper<Object>(stormSpout);
                wrapper.setRuntimeContext(taskContext);

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
deleted file mode 100644
index eef35cf..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-class FiniteTestSpout implements IRichSpout {
-       private static final long serialVersionUID = 7992419478267824279L;
-
-       private int numberOfOutputTuples;
-       private SpoutOutputCollector collector;
-
-       public FiniteTestSpout(final int numberOfOutputTuples) {
-               this.numberOfOutputTuples = numberOfOutputTuples;
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void open(final Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
-               this.collector = collector;
-       }
-
-       @Override
-       public void close() {/* nothing to do */}
-
-       @Override
-       public void activate() {/* nothing to do */}
-
-       @Override
-       public void deactivate() {/* nothing to do */}
-
-       @Override
-       public void nextTuple() {
-               if (--this.numberOfOutputTuples >= 0) {
-                       this.collector.emit(new Values(new 
Integer(this.numberOfOutputTuples)));
-               }
-       }
-
-       @Override
-       public void ack(final Object msgId) {/* nothing to do */}
-
-       @Override
-       public void fail(final Object msgId) {/* nothing to do */}
-
-       @Override
-       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-               declarer.declare(new Fields("dummy"));
-       }
-
-       @Override
-       public Map<String, Object> getComponentConfiguration() {
-               return null;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
deleted file mode 100644
index c0a6ed3..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkStormStreamSelectorTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Iterator;
-
-import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class FlinkStormStreamSelectorTest {
-
-       @Test
-       public void testSelector() {
-               FlinkStormStreamSelector<Object> selector = new 
FlinkStormStreamSelector<Object>();
-               SplitStreamType<Object> tuple = new SplitStreamType<Object>();
-               Iterator<String> result;
-
-               tuple.streamId = "stream1";
-               result = selector.select(tuple).iterator();
-               Assert.assertEquals("stream1", result.next());
-               Assert.assertFalse(result.hasNext());
-
-               tuple.streamId = "stream2";
-               result = selector.select(tuple).iterator();
-               Assert.assertEquals("stream2", result.next());
-               Assert.assertFalse(result.hasNext());
-
-               tuple.streamId = "stream1";
-               result = selector.select(tuple).iterator();
-               Assert.assertEquals("stream1", result.next());
-               Assert.assertFalse(result.hasNext());
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..738eb1e
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.wrappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class SetupOutputFieldsDeclarerTest extends AbstractTest {
+
+       @Test
+       public void testDeclare() {
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+
+               int numberOfAttributes = this.r.nextInt(26);
+               declarer.declare(createSchema(numberOfAttributes));
+               Assert.assertEquals(1, declarer.outputSchemas.size());
+               Assert.assertEquals(numberOfAttributes, 
declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
+                               .intValue());
+
+               final String sid = "streamId";
+               numberOfAttributes = this.r.nextInt(26);
+               declarer.declareStream(sid, createSchema(numberOfAttributes));
+               Assert.assertEquals(2, declarer.outputSchemas.size());
+               Assert.assertEquals(numberOfAttributes, 
declarer.outputSchemas.get(sid).intValue());
+       }
+
+       private Fields createSchema(final int numberOfAttributes) {
+               final ArrayList<String> schema = new 
ArrayList<String>(numberOfAttributes);
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       schema.add("a" + i);
+               }
+               return new Fields(schema);
+       }
+
+       @Test
+       public void testDeclareDirect() {
+               new SetupOutputFieldsDeclarer().declare(false, new Fields());
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testDeclareDirectFail() {
+               new SetupOutputFieldsDeclarer().declare(true, new Fields());
+       }
+
+       @Test
+       public void testDeclareStream() {
+               new 
SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new 
Fields());
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareStreamFail() {
+               new SetupOutputFieldsDeclarer().declareStream(null, new 
Fields());
+       }
+
+       @Test
+       public void testDeclareFullStream() {
+               new 
SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new 
Fields());
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareFullStreamFailNonDefaultStream() {
+               new SetupOutputFieldsDeclarer().declareStream(null, false, new 
Fields());
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testDeclareFullStreamFailDirect() {
+               new 
SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new 
Fields());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 5cfb151..6817593 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -61,9 +61,9 @@ public class StormBoltWrapperTest extends AbstractTest {
 
        @Test(expected = IllegalArgumentException.class)
        public void testWrapperRawType() throws Exception {
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields("dummy1", "dummy2"));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                new StormBoltWrapper<Object, Object>(mock(IRichBolt.class),
                                new String[] { Utils.DEFAULT_STREAM_ID });
@@ -71,26 +71,26 @@ public class StormBoltWrapperTest extends AbstractTest {
 
        @Test(expected = IllegalArgumentException.class)
        public void testWrapperToManyAttributes1() throws Exception {
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                final String[] schema = new String[26];
                for (int i = 0; i < schema.length; ++i) {
                        schema[i] = "a" + i;
                }
                declarer.declare(new Fields(schema));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
        }
 
        @Test(expected = IllegalArgumentException.class)
        public void testWrapperToManyAttributes2() throws Exception {
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                final String[] schema = new String[26];
                for (int i = 0; i < schema.length; ++i) {
                        schema[i] = "a" + i;
                }
                declarer.declare(new Fields(schema));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), new 
String[] {});
        }
@@ -133,12 +133,14 @@ public class StormBoltWrapperTest extends AbstractTest {
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final IRichBolt bolt = mock(IRichBolt.class);
 
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields(schema));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                final StormBoltWrapper wrapper = new StormBoltWrapper(bolt, 
(Fields) null);
                wrapper.setup(mock(Output.class), taskContext);
@@ -163,6 +165,8 @@ public class StormBoltWrapperTest extends AbstractTest {
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final Output output = mock(Output.class);
 
@@ -209,14 +213,17 @@ public class StormBoltWrapperTest extends AbstractTest {
 
                final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
                
when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
-               .thenReturn(flinkConfig);
+                               .thenReturn(flinkConfig);
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields("dummy"));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
 
                final IRichBolt bolt = mock(IRichBolt.class);
                final StormBoltWrapper<Object, Object> wrapper = new 
StormBoltWrapper<Object, Object>(bolt);
@@ -249,8 +256,11 @@ public class StormBoltWrapperTest extends AbstractTest {
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final IRichBolt bolt = mock(IRichBolt.class);
+
                final StormBoltWrapper<Object, Object> wrapper = new 
StormBoltWrapper<Object, Object>(bolt);
                wrapper.setup(mock(Output.class), taskContext);
 
@@ -275,9 +285,9 @@ public class StormBoltWrapperTest extends AbstractTest {
        public void testClose() throws Exception {
                final IRichBolt bolt = mock(IRichBolt.class);
 
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields("dummy"));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                final StormBoltWrapper<Object, Object> wrapper = new 
StormBoltWrapper<Object, Object>(bolt);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
index a4eea7e..77f1b05 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -22,7 +22,9 @@ import backtype.storm.tuple.Fields;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Assert;
@@ -46,12 +48,14 @@ public class StormFiniteSpoutWrapperTest extends 
AbstractTest {
        @SuppressWarnings("unchecked")
        @Test
        public void testRunExecuteFixedNumber() throws Exception {
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields("dummy"));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final IRichSpout spout = mock(IRichSpout.class);
                final int numberOfCalls = this.r.nextInt(50);
@@ -73,6 +77,8 @@ public class StormFiniteSpoutWrapperTest extends AbstractTest 
{
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
                final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = 
new StormFiniteSpoutWrapper<Tuple1<Integer>>(
@@ -94,11 +100,12 @@ public class StormFiniteSpoutWrapperTest extends 
AbstractTest {
 
                StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
                final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = 
new StormFiniteSpoutWrapper<Tuple1<Integer>>(
                                spout);
-               spoutWrapper.setRuntimeContext(taskContext);
 
                spoutWrapper.cancel();
                final TestContext collector = new TestContext();

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
deleted file mode 100644
index 561939f..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-public class StormOutputFieldsDeclarerTest extends AbstractTest {
-
-       @Test
-       public void testDeclare() {
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
-
-               int numberOfAttributes = this.r.nextInt(26);
-               declarer.declare(createSchema(numberOfAttributes));
-               Assert.assertEquals(1, declarer.outputSchemas.size());
-               Assert.assertEquals(numberOfAttributes, 
declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
-                               .intValue());
-
-               final String sid = "streamId";
-               numberOfAttributes = this.r.nextInt(26);
-               declarer.declareStream(sid, createSchema(numberOfAttributes));
-               Assert.assertEquals(2, declarer.outputSchemas.size());
-               Assert.assertEquals(numberOfAttributes, 
declarer.outputSchemas.get(sid).intValue());
-       }
-
-       private Fields createSchema(final int numberOfAttributes) {
-               final ArrayList<String> schema = new 
ArrayList<String>(numberOfAttributes);
-               for (int i = 0; i < numberOfAttributes; ++i) {
-                       schema.add("a" + i);
-               }
-               return new Fields(schema);
-       }
-
-       @Test
-       public void testDeclareDirect() {
-               new StormOutputFieldsDeclarer().declare(false, new Fields());
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testDeclareDirectFail() {
-               new StormOutputFieldsDeclarer().declare(true, new Fields());
-       }
-
-       @Test
-       public void testDeclareStream() {
-               new 
StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new 
Fields());
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testDeclareStreamFail() {
-               new StormOutputFieldsDeclarer().declareStream(null, new 
Fields());
-       }
-
-       @Test
-       public void testDeclareFullStream() {
-               new 
StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new 
Fields());
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void testDeclareFullStreamFailNonDefaultStream() {
-               new StormOutputFieldsDeclarer().declareStream(null, false, new 
Fields());
-       }
-
-       @Test(expected = UnsupportedOperationException.class)
-       public void testDeclareFullStreamFailDirect() {
-               new 
StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new 
Fields());
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 04dc48d..f4fb4da 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -25,9 +25,11 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.FiniteTestSpout;
 import org.apache.flink.stormcompatibility.util.StormConfig;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -89,8 +91,12 @@ public class StormSpoutWrapperTest extends AbstractTest {
 
                final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
                when(taskContext.getExecutionConfig()).thenReturn(new 
ExecutionConfig());
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
 
                final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+
+
                final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new 
StormSpoutWrapper<Tuple1<Integer>>(spout);
                spoutWrapper.setRuntimeContext(taskContext);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
index 7497ffc..c799d63 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelperTest.java
@@ -14,29 +14,46 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.flink.stormcompatibility.wrappers;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
 
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IComponent;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 
+import org.apache.flink.stormcompatibility.api.TestTopologyBuilder;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
+import org.apache.flink.stormcompatibility.util.TestDummyBolt;
+import org.apache.flink.stormcompatibility.util.TestDummySpout;
+import org.apache.flink.stormcompatibility.util.TestSink;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import com.google.common.collect.Sets;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+@PowerMockIgnore("javax.*")
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StormWrapperSetupHelper.class)
 public class StormWrapperSetupHelperTest extends AbstractTest {
@@ -65,9 +82,9 @@ public class StormWrapperSetupHelperTest extends AbstractTest 
{
                        boltOrSpout = mock(IRichBolt.class);
                }
 
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields("dummy1", "dummy2"));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
                                Sets.newHashSet(new String[] { 
Utils.DEFAULT_STREAM_ID }));
@@ -83,13 +100,13 @@ public class StormWrapperSetupHelperTest extends 
AbstractTest {
                        boltOrSpout = mock(IRichBolt.class);
                }
 
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                final String[] schema = new String[26];
                for (int i = 0; i < schema.length; ++i) {
                        schema[i] = "a" + i;
                }
                declarer.declare(new Fields(schema));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                StormWrapperSetupHelper.getNumberOfAttributes(boltOrSpout, 
null);
        }
@@ -119,9 +136,9 @@ public class StormWrapperSetupHelperTest extends 
AbstractTest {
                        boltOrSpout = mock(IRichBolt.class);
                }
 
-               final StormOutputFieldsDeclarer declarer = new 
StormOutputFieldsDeclarer();
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
                declarer.declare(new Fields(schema));
-               
PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                HashMap<String, Integer> attributes = new HashMap<String, 
Integer>();
                attributes.put(Utils.DEFAULT_STREAM_ID, numberOfAttributes);
@@ -132,4 +149,167 @@ public class StormWrapperSetupHelperTest extends 
AbstractTest {
                                                .newHashSet(new String[] { 
Utils.DEFAULT_STREAM_ID }) : null));
        }
 
+       @Test
+       public void testCreateTopologyContext() {
+               HashMap<String, Integer> dops = new HashMap<String, Integer>();
+               dops.put("spout1", 1);
+               dops.put("spout2", 3);
+               dops.put("bolt1", 1);
+               dops.put("bolt2", 2);
+               dops.put("sink", 1);
+
+               HashMap<String, Integer> taskCounter = new HashMap<String, 
Integer>();
+               taskCounter.put("spout1", 0);
+               taskCounter.put("spout2", 0);
+               taskCounter.put("bolt1", 0);
+               taskCounter.put("bolt2", 0);
+               taskCounter.put("sink", 0);
+
+               HashMap<String, IComponent> operators = new HashMap<String, 
IComponent>();
+               operators.put("spout1", new TestDummySpout());
+               operators.put("spout2", new TestDummySpout());
+               operators.put("bolt1", new TestDummyBolt());
+               operators.put("bolt2", new TestDummyBolt());
+               operators.put("sink", new TestSink());
+
+               TopologyBuilder builder = new TopologyBuilder();
+
+               builder.setSpout("spout1", (IRichSpout) 
operators.get("spout1"), dops.get("spout1"));
+               builder.setSpout("spout2", (IRichSpout) 
operators.get("spout2"), dops.get("spout2"));
+               builder.setBolt("bolt1", (IRichBolt) operators.get("bolt1"), 
dops.get("bolt1")).shuffleGrouping("spout1");
+               builder.setBolt("bolt2", (IRichBolt) operators.get("bolt2"), 
dops.get("bolt2")).allGrouping("spout2");
+               builder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
+               .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new 
Fields("id"))
+               .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+               .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new 
Fields("id"))
+               .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+               final int maxRetry = 3;
+               int counter;
+               for (counter = 0; counter < maxRetry; ++counter) {
+                       LocalCluster cluster = new LocalCluster();
+                       Config c = new Config();
+                       c.setNumAckers(0);
+                       cluster.submitTopology("test", c, 
builder.createTopology());
+                       Utils.sleep((counter + 1) * 5000);
+                       cluster.shutdown();
+
+                       if (TestSink.result.size() == 8) {
+                               break;
+                       }
+               }
+               Assert.assertTrue(counter < maxRetry);
+
+               TestTopologyBuilder flinkBuilder = new TestTopologyBuilder();
+
+               flinkBuilder.setSpout("spout1", (IRichSpout) 
operators.get("spout1"), dops.get("spout1"));
+               flinkBuilder.setSpout("spout2", (IRichSpout) 
operators.get("spout2"), dops.get("spout2"));
+               flinkBuilder.setBolt("bolt1", (IRichBolt) 
operators.get("bolt1"), dops.get("bolt1")).shuffleGrouping("spout1");
+               flinkBuilder.setBolt("bolt2", (IRichBolt) 
operators.get("bolt2"), dops.get("bolt2")).allGrouping("spout2");
+               flinkBuilder.setBolt("sink", (IRichBolt) operators.get("sink"), 
dops.get("sink"))
+               .fieldsGrouping("bolt1", TestDummyBolt.groupingStreamId, new 
Fields("id"))
+               .shuffleGrouping("bolt1", TestDummyBolt.shuffleStreamId)
+               .fieldsGrouping("bolt2", TestDummyBolt.groupingStreamId, new 
Fields("id"))
+               .shuffleGrouping("bolt2", TestDummyBolt.shuffleStreamId);
+
+               flinkBuilder.createTopology();
+               StormTopology stormTopology = flinkBuilder.getStormTopology();
+
+               Set<Integer> taskIds = new HashSet<Integer>();
+
+               for (TopologyContext expectedContext : TestSink.result) {
+                       final String thisComponentId = 
expectedContext.getThisComponentId();
+                       int index = taskCounter.get(thisComponentId);
+
+                       StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
+                       when(context.getTaskName()).thenReturn(thisComponentId);
+                       
when(context.getNumberOfParallelSubtasks()).thenReturn(dops.get(thisComponentId));
+                       when(context.getIndexOfThisSubtask()).thenReturn(index);
+                       taskCounter.put(thisComponentId, ++index);
+
+                       Config stormConfig = new Config();
+                       stormConfig.put(StormWrapperSetupHelper.TOPOLOGY_NAME, 
"test");
+
+                       TopologyContext topologyContext = 
StormWrapperSetupHelper.createTopologyContext(
+                                       context, 
operators.get(thisComponentId), stormTopology, stormConfig);
+
+                       ComponentCommon expcetedCommon = 
expectedContext.getComponentCommon(thisComponentId);
+                       ComponentCommon common = 
topologyContext.getComponentCommon(thisComponentId);
+
+                       Assert.assertNull(topologyContext.getCodeDir());
+                       Assert.assertNull(common.get_json_conf());
+                       
Assert.assertNull(topologyContext.getExecutorData(null));
+                       Assert.assertNull(topologyContext.getPIDDir());
+                       Assert.assertNull(topologyContext.getResource(null));
+                       Assert.assertNull(topologyContext.getSharedExecutor());
+                       Assert.assertNull(expectedContext.getTaskData(null));
+                       Assert.assertNull(topologyContext.getThisWorkerPort());
+
+                       
Assert.assertTrue(expectedContext.getStormId().startsWith(topologyContext.getStormId()));
+
+                       Assert.assertEquals(expcetedCommon.get_inputs(), 
common.get_inputs());
+                       
Assert.assertEquals(expcetedCommon.get_parallelism_hint(), 
common.get_parallelism_hint());
+                       Assert.assertEquals(expcetedCommon.get_streams(), 
common.get_streams());
+                       Assert.assertEquals(expectedContext.getComponentIds(), 
topologyContext.getComponentIds());
+                       
Assert.assertEquals(expectedContext.getComponentStreams(thisComponentId),
+                                       
topologyContext.getComponentStreams(thisComponentId));
+                       Assert.assertEquals(thisComponentId, 
topologyContext.getThisComponentId());
+                       Assert.assertEquals(expectedContext.getThisSources(), 
topologyContext.getThisSources());
+                       Assert.assertEquals(expectedContext.getThisStreams(), 
topologyContext.getThisStreams());
+                       Assert.assertEquals(expectedContext.getThisTargets(), 
topologyContext.getThisTargets());
+                       Assert.assertEquals(0, 
topologyContext.getThisWorkerTasks().size());
+
+                       for (int taskId : 
topologyContext.getComponentTasks(thisComponentId)) {
+                               Assert.assertEquals(thisComponentId, 
topologyContext.getComponentId(taskId));
+                       }
+
+                       for (String componentId : 
expectedContext.getComponentIds()) {
+                               
Assert.assertEquals(expectedContext.getSources(componentId),
+                                               
topologyContext.getSources(componentId));
+                               
Assert.assertEquals(expectedContext.getTargets(componentId),
+                                               
topologyContext.getTargets(componentId));
+
+                               for (String streamId : 
expectedContext.getComponentStreams(componentId)) {
+                                       Assert.assertEquals(
+                                                       
expectedContext.getComponentOutputFields(componentId, streamId).toList(),
+                                                       
topologyContext.getComponentOutputFields(componentId, streamId).toList());
+                               }
+                       }
+
+                       for (String streamId : 
expectedContext.getThisStreams()) {
+                               
Assert.assertEquals(expectedContext.getThisOutputFields(streamId).toList(),
+                                               
topologyContext.getThisOutputFields(streamId).toList());
+                       }
+
+                       HashMap<Integer, String> taskToComponents = new 
HashMap<Integer, String>();
+                       Set<Integer> allTaskIds = new HashSet<Integer>();
+                       for (String componentId : 
expectedContext.getComponentIds()) {
+                               List<Integer> possibleTasks = 
expectedContext.getComponentTasks(componentId);
+                               List<Integer> tasks = 
topologyContext.getComponentTasks(componentId);
+
+                               Iterator<Integer> p_it = 
possibleTasks.iterator();
+                               Iterator<Integer> t_it = tasks.iterator();
+                               while(p_it.hasNext()) {
+                                       Assert.assertTrue(t_it.hasNext());
+                                       
Assert.assertNull(taskToComponents.put(p_it.next(), componentId));
+                                       
Assert.assertTrue(allTaskIds.add(t_it.next()));
+                               }
+                               Assert.assertFalse(t_it.hasNext());
+                       }
+
+                       Assert.assertEquals(taskToComponents, 
expectedContext.getTaskToComponent());
+                       
Assert.assertTrue(taskIds.add(topologyContext.getThisTaskId()));
+
+                       try {
+                               topologyContext.getHooks();
+                               Assert.fail();
+                       } catch (UnsupportedOperationException e) { /* expected 
*/ }
+
+                       try {
+                               topologyContext.getRegisteredMetricByName(null);
+                               Assert.fail();
+                       } catch (UnsupportedOperationException e) { /* expected 
*/ }
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
index 64b3e28..d3776fb 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormFileSpout.java
@@ -20,7 +20,7 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Values;
 
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
 
 import java.io.IOException;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
index 6fb764d..5efff66 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.stormcompatibility.util;
 
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpout;
+import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
 
 /**
  * Implements a Storm Spout that reads String[] data stored in the memory. The 
spout stops

http://git-wip-us.apache.org/repos/asf/flink/blob/7a67a60f/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
index c992b6b..5f637d3 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/split/SplitBoltTopology.java
@@ -49,10 +49,10 @@ public class SplitBoltTopology {
                        final String[] tokens = outputPath.split(":");
                        final String outputFile = tokens[tokens.length - 1];
                        builder.setBolt(sinkId, new 
StormBoltFileSink(outputFile, formatter))
-                       
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+                                       
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
                } else {
                        builder.setBolt(sinkId, new 
StormBoltPrintSink(formatter), 4)
-                       
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+                                       
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
                }
 
                return builder;

Reply via email to