http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 30227b8..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for 
streams that are selected via
- * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares 
multiple output streams.
- * 
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} 
tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements 
KeySelector<SplitStreamType<Tuple>, Tuple> {
-       private static final long serialVersionUID = 4672434660037669254L;
-
-       private final ArrayKeySelector<Tuple> selector;
-
-       public SplitStreamTypeKeySelector(int... fields) {
-               this.selector = new 
KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
-       }
-
-       @Override
-       public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
-               return selector.getKey(value.value);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
deleted file mode 100644
index 114fa7c..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- * This interface represents a Storm spout that emits a finite number of 
records. Common Storm
- * spouts emit infinite streams by default. To change this behaviour and take 
advantage of
- * Flink's finite-source capabilities, the spout should implement this 
interface. To wrap
- * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
- */
-public interface FiniteStormSpout extends IRichSpout {
-
-       /**
-        * When returns true, the spout has reached the end of the stream.
-        *
-        * @return true, if the spout's stream reached its end, false otherwise
-        */
-       public boolean reachedEnd();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
deleted file mode 100644
index 3eee8d6..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema 
of a
- * {@link backtype.storm.topology.IRichSpout spout} or {@link 
backtype.storm.topology.IRichBolt bolt}.<br />
- * <br />
- * <strong>CAUTION: Flink does not support direct emit.</strong>
- */
-public final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
-       /** The declared output streams and schemas. */
-       public final HashMap<String, Fields> outputStreams = new 
HashMap<String, Fields>();
-
-       @Override
-       public void declare(final Fields fields) {
-               this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
-       }
-
-       /**
-        * {@inheritDoc}
-        * <p/>
-        * Direct emit is no supported by Flink. Parameter {@code direct} must 
be {@code false}.
-        *
-        * @throws UnsupportedOperationException
-        *              if {@code direct} is {@code true}
-        */
-       @Override
-       public void declare(final boolean direct, final Fields fields) {
-               this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-       }
-
-       @Override
-       public void declareStream(final String streamId, final Fields fields) {
-               this.declareStream(streamId, false, fields);
-       }
-
-       /**
-        * {@inheritDoc}
-        * <p/>
-        * Direct emit is no supported by Flink. Parameter {@code direct} must 
be {@code false}.
-        *
-        * @throws UnsupportedOperationException
-        *              if {@code direct} is {@code true}
-        */
-       @Override
-       public void declareStream(final String streamId, final boolean direct, 
final Fields fields) {
-               if (direct) {
-                       throw new UnsupportedOperationException("Direct emit is 
not supported by Flink");
-               }
-
-               this.outputStreams.put(streamId, fields);
-       }
-
-       /**
-        * Returns {@link TypeInformation} for the declared output schema for a 
specific stream.
-        * 
-        * @param streamId
-        *            A stream ID.
-        * 
-        * @return output type information for the declared output schema of 
the specified stream; or {@code null} if
-        *         {@code streamId == null}
-        * 
-        * @throws IllegalArgumentException
-        *             If no output schema was declared for the specified 
stream or if more then 25 attributes got declared.
-        */
-       public TypeInformation<?> getOutputType(final String streamId) throws 
IllegalArgumentException {
-               if (streamId == null) {
-                       return null;
-               }
-
-               Fields outputSchema = this.outputStreams.get(streamId);
-               if (outputSchema == null) {
-                       throw new IllegalArgumentException("Stream with ID '" + 
streamId
-                                       + "' was not declared.");
-               }
-
-               Tuple t;
-               final int numberOfAttributes = outputSchema.size();
-
-               if (numberOfAttributes == 1) {
-                       return TypeExtractor.getForClass(Object.class);
-               } else if (numberOfAttributes <= 25) {
-                       try {
-                               t = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
-                       } catch (final InstantiationException e) {
-                               throw new RuntimeException(e);
-                       } catch (final IllegalAccessException e) {
-                               throw new RuntimeException(e);
-                       }
-               } else {
-                       throw new IllegalArgumentException("Flink supports only 
a maximum number of 25 attributes");
-               }
-
-               // TODO: declare only key fields as DefaultComparable
-               for (int i = 0; i < numberOfAttributes; ++i) {
-                       t.setField(new DefaultComparable(), i);
-               }
-
-               return TypeExtractor.getForObject(t);
-       }
-
-       /**
-        * {@link DefaultComparable} is a {@link Comparable} helper class that 
is used to get the correct {@link
-        * TypeInformation} from {@link TypeExtractor} within {@link 
#getOutputType()}. If key fields are not comparable,
-        * Flink cannot use them and will throw an exception.
-        */
-       private static class DefaultComparable implements 
Comparable<DefaultComparable> {
-
-               public DefaultComparable() {
-               }
-
-               @Override
-               public int compareTo(final DefaultComparable o) {
-                       return 0;
-               }
-       }
-
-       /**
-        * Computes the indexes within the declared output schema of the 
specified stream, for a list of given
-        * field-grouping attributes.
-        * 
-        * @param streamId
-        *            A stream ID.
-        * @param groupingFields
-        *            The names of the key fields.
-        * 
-        * @return array of {@code int}s that contains the index within the 
output schema for each attribute in the given
-        *         list
-        */
-       public int[] getGroupingFieldIndexes(final String streamId, final 
List<String> groupingFields) {
-               final int[] fieldIndexes = new int[groupingFields.size()];
-
-               for (int i = 0; i < fieldIndexes.length; ++i) {
-                       fieldIndexes[i] = 
this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
-               }
-
-               return fieldIndexes;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
deleted file mode 100644
index 7e60a87..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-/**
- * Used by {@link FlinkTopologyBuilder} to split multiple declared output 
streams within Flink.
- */
-public final class FlinkStormStreamSelector<T> implements 
OutputSelector<SplitStreamType<T>> {
-       private static final long serialVersionUID = 2553423379715401023L;
-
-       /** internal cache to avoid short living ArrayList objects. */
-       private final HashMap<String, List<String>> streams = new 
HashMap<String, List<String>>();
-
-       @Override
-       public Iterable<String> select(SplitStreamType<T> value) {
-               String sid = value.streamId;
-               List<String> streamId = this.streams.get(sid);
-               if (streamId == null) {
-                       streamId = new ArrayList<String>(1);
-                       streamId.add(sid);
-                       this.streams.put(sid, streamId);
-               }
-               return streamId;
-       }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
deleted file mode 100644
index 14af830..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import clojure.lang.Atom;
-
-/**
- * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites 
certain method that are not applicable when
- * a Storm topology is executed within Flink.
- */
-public final class FlinkTopologyContext extends TopologyContext {
-
-       /**
-        * Instantiates a new {@link FlinkTopologyContext} for a given Storm 
topology. The context object is instantiated
-        * for each parallel task
-        */
-       public FlinkTopologyContext(final StormTopology topology, 
@SuppressWarnings("rawtypes") final Map stormConf,
-                       final Map<Integer, String> taskToComponent, final 
Map<String, List<Integer>> componentToSortedTasks,
-                       final Map<String, Map<String, Fields>> 
componentToStreamToFields, final String stormId, final String codeDir,
-                       final String pidDir, final Integer taskId, final 
Integer workerPort, final List<Integer> workerTasks,
-                       final Map<String, Object> defaultResources, final 
Map<String, Object> userResources,
-                       final Map<String, Object> executorData, 
@SuppressWarnings("rawtypes") final Map registeredMetrics,
-                       final Atom openOrPrepareWasCalled) {
-               super(topology, stormConf, taskToComponent, 
componentToSortedTasks, componentToStreamToFields, stormId,
-                               codeDir, pidDir, taskId, workerPort, 
workerTasks, defaultResources, userResources, executorData,
-                               registeredMetrics, openOrPrepareWasCalled);
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public void addTaskHook(final ITaskHook hook) {
-               throw new UnsupportedOperationException("Task hooks are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public Collection<ITaskHook> getHooks() {
-               throw new UnsupportedOperationException("Task hooks are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public IMetric getRegisteredMetricByName(final String name) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @SuppressWarnings("rawtypes")
-       @Override
-       public CombinedMetric registerMetric(final String name, final ICombiner 
combiner, final int timeBucketSizeInSecs) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @SuppressWarnings("rawtypes")
-       @Override
-       public ReducedMetric registerMetric(final String name, final IReducer 
combiner, final int timeBucketSizeInSecs) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @SuppressWarnings("unchecked")
-       @Override
-       public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
-               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public <T extends ISubscribedState> T setAllSubscribedState(final T 
obj) {
-               throw new UnsupportedOperationException("Not supported by 
Flink");
-
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public <T extends ISubscribedState> T setSubscribedState(final String 
componentId, final T obj) {
-               throw new UnsupportedOperationException("Not supported by 
Flink");
-       }
-
-       /**
-        * Not supported by Flink.
-        *
-        * @throws UnsupportedOperationException
-        *              at every invocation
-        */
-       @Override
-       public <T extends ISubscribedState> T setSubscribedState(final String 
componentId, final String streamId, final T
-                       obj) {
-               throw new UnsupportedOperationException("Not supported by 
Flink");
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
deleted file mode 100644
index 9cb44ec..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-/**
- * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped 
record of type {@code T}. Can be used to get
- * a "clean" stream from a Spout/Bolt that declared multiple output streams 
(after the streams got separated using
- * {@link 
DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)
 .split(...)} and
- * {@link SplitStream#select(String...) .select(...)}).
- * 
- * @param <T>
- */
-public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, 
T> {
-       private static final long serialVersionUID = 3550359150160908564L;
-
-       @Override
-       public T map(SplitStreamType<T> value) throws Exception {
-               return value.value;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
deleted file mode 100644
index 9c7e477..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/**
- * Used by {@link 
org.apache.flink.stormcompatibility.wrappers.AbstractStormCollector 
AbstractStormCollector} to wrap
- * output tuples if multiple output streams are declared. For this case, the 
Flink output data stream must be split via
- * {@link 
DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)
 .split(...)} using
- * {@link FlinkStormStreamSelector}.
- */
-public class SplitStreamType<T> {
-
-       /** The stream ID this tuple belongs to. */
-       public String streamId;
-       /** The actual data value. */
-       public T value;
-
-       @Override
-       public String toString() {
-               return "<sid:" + this.streamId + ",v:" + this.value + ">";
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-               SplitStreamType<?> other = (SplitStreamType<?>) o;
-
-               return this.streamId.equals(other.streamId) && 
this.value.equals(other.value);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 200f772..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for 
streams that are selected via
- * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares 
multiple output streams.
- * 
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} 
tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements 
KeySelector<SplitStreamType<Tuple>, Tuple> {
-       private static final long serialVersionUID = 4672434660037669254L;
-
-       private final ArrayKeySelector<Tuple> selector;
-
-       public SplitStreamTypeKeySelector(int... fields) {
-               this.selector = new 
KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
-       }
-
-       @Override
-       public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
-               return selector.getKey(value.value);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
deleted file mode 100644
index 6726ae8..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-
-import backtype.storm.Config;
-
-/**
- * {@link StormConfig} is used to provide a user-defined Storm configuration 
(ie, a raw {@link Map} or {@link Config}
- * object) for embedded Spouts and Bolts.
- */
-@SuppressWarnings("rawtypes")
-public final class StormConfig extends GlobalJobParameters implements Map {
-       private static final long serialVersionUID = 8019519109673698490L;
-
-       /** Contains the actual configuration that is provided to Spouts and 
Bolts. */
-       private final Map config = new HashMap();
-
-       /**
-        * Creates an empty configuration.
-        */
-       public StormConfig() {
-       }
-
-       /**
-        * Creates an configuration with initial values provided by the given 
{@code Map}.
-        * 
-        * @param config
-        *            Initial values for this configuration.
-        */
-       @SuppressWarnings("unchecked")
-       public StormConfig(Map config) {
-               this.config.putAll(config);
-       }
-
-
-       @Override
-       public int size() {
-               return this.config.size();
-       }
-
-       @Override
-       public boolean isEmpty() {
-               return this.config.isEmpty();
-       }
-
-       @Override
-       public boolean containsKey(Object key) {
-               return this.config.containsKey(key);
-       }
-
-       @Override
-       public boolean containsValue(Object value) {
-               return this.config.containsValue(value);
-       }
-
-       @Override
-       public Object get(Object key) {
-               return this.config.get(key);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public Object put(Object key, Object value) {
-               return this.config.put(key, value);
-       }
-
-       @Override
-       public Object remove(Object key) {
-               return this.config.remove(key);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void putAll(Map m) {
-               this.config.putAll(m);
-       }
-
-       @Override
-       public void clear() {
-               this.config.clear();
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public Set<Object> keySet() {
-               return this.config.keySet();
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public Collection<Object> values() {
-               return this.config.values();
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public Set<java.util.Map.Entry<Object, Object>> entrySet() {
-               return this.config.entrySet();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
deleted file mode 100644
index 7b35a64..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
-/**
- * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
- */
-abstract class AbstractStormCollector<OUT> {
-
-       /** Flink output tuple of concrete type {@link Tuple0} to {@link 
Tuple25} per output stream. */
-       protected final HashMap<String, Tuple> outputTuple = new 
HashMap<String, Tuple>();
-       /** Flink split tuple. Used, if multiple output streams are declared. */
-       private final SplitStreamType<Object> splitTuple = new 
SplitStreamType<Object>();
-       /**
-        * The number of attributes of the output tuples per stream. 
(Determines the concrete type of {@link #outputTuple}).
-        * If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not 
used and "raw" data type is used.
-        */
-       protected final HashMap<String, Integer> numberOfAttributes;
-       /** Indicates of multiple output stream are declared and thus {@link 
SplitStreamType} must be used as output. */
-       private final boolean split;
-       /** Is set to {@code true} each time a tuple is emitted. */
-       boolean tupleEmitted = false;
-
-       /**
-        * Instantiates a new {@link AbstractStormCollector} that emits Flink 
tuples via {@link #doEmit(Object)}. If the
-        * number of attributes is negative, any output type is supported (ie, 
raw type). If the number of attributes is
-        * between 0 and 25, the output type is {@link Tuple0} to {@link 
Tuple25}, respectively.
-        * 
-        * @param numberOfAttributes
-        *            The number of attributes of the emitted tuples per output 
stream.
-        * @throws UnsupportedOperationException
-        *             if the specified number of attributes is greater than 25
-        */
-       public AbstractStormCollector(final HashMap<String, Integer> 
numberOfAttributes)
-                       throws UnsupportedOperationException {
-               assert (numberOfAttributes != null);
-
-               this.numberOfAttributes = numberOfAttributes;
-               this.split = this.numberOfAttributes.size() > 1;
-
-               for (Entry<String, Integer> outputStream : 
numberOfAttributes.entrySet()) {
-                       final int numAtt = outputStream.getValue();
-                       assert (numAtt >= -1);
-
-                       if (numAtt > 25) {
-                               throw new UnsupportedOperationException(
-                                               "Flink cannot handle more then 
25 attributes, but " + numAtt
-                                               + " are declared for stream '" 
+ outputStream.getKey()
-                                               + "' by the given bolt");
-                       } else if (numAtt >= 0) {
-                               try {
-                                       
this.outputTuple.put(outputStream.getKey(),
-                                                       
org.apache.flink.api.java.tuple.Tuple.getTupleClass(numAtt)
-                                                       .newInstance());
-                               } catch (final InstantiationException e) {
-                                       throw new RuntimeException(e);
-                               } catch (final IllegalAccessException e) {
-                                       throw new RuntimeException(e);
-                               }
-
-                       }
-               }
-       }
-
-       /**
-        * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and 
emits this tuple via {@link #doEmit(Object)}
-        * to the specified output stream.
-        * 
-        * @param The
-        *            The output stream id.
-        * @param tuple
-        *            The Storm tuple to be emitted.
-        * @return the return value of {@link #doEmit(Object)}
-        */
-       @SuppressWarnings("unchecked")
-       protected final List<Integer> tansformAndEmit(final String streamId, 
final List<Object> tuple) {
-               List<Integer> taskIds;
-
-               final int numAtt = this.numberOfAttributes.get(streamId);
-               if (numAtt > -1) {
-                       assert (tuple.size() == numAtt);
-                       Tuple out = this.outputTuple.get(streamId);
-                       for (int i = 0; i < numAtt; ++i) {
-                               out.setField(tuple.get(i), i);
-                       }
-                       if (this.split) {
-                               this.splitTuple.streamId = streamId;
-                               this.splitTuple.value = out;
-
-                               taskIds = doEmit((OUT) this.splitTuple);
-                       } else {
-                               taskIds = doEmit((OUT) out);
-                       }
-
-               } else {
-                       assert (tuple.size() == 1);
-                       if (split) {
-                               this.splitTuple.streamId = streamId;
-                               this.splitTuple.value = tuple.get(0);
-
-                               taskIds = doEmit((OUT) this.splitTuple);
-                       } else {
-                               taskIds = doEmit((OUT) tuple.get(0));
-                       }
-               }
-               this.tupleEmitted = true;
-
-               return taskIds;
-       }
-
-       /**
-        * Emits a Flink tuple.
-        * 
-        * @param flinkTuple
-        *              The tuple to be emitted.
-        * @return the IDs of the tasks this tuple was sent to
-        */
-       protected abstract List<Integer> doEmit(OUT flinkTuple);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
deleted file mode 100644
index ccd29bb..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-import java.util.HashMap;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-/**
- * A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to 
execute the Storm bolt within a Flink
- * Streaming program. It takes the spout's output tuples and transforms them 
into Flink tuples of type {@code OUT} (see
- * {@link StormSpoutCollector} for supported types).<br />
- * <br />
- * <strong>CAUTION: currently, only simple spouts are supported! (ie, spouts 
that do not use the Storm configuration
- * <code>Map</code> or <code>TopologyContext</code> that is provided by the 
spouts's <code>prepare(..)</code> method.
- * Furthermore, ack and fail back calls as well as tuple IDs are not supported 
so far.</strong>
- */
-public abstract class AbstractStormSpoutWrapper<OUT> extends 
RichParallelSourceFunction<OUT> {
-       private static final long serialVersionUID = 4993283609095408765L;
-
-       /** Number of attributes of the bolt's output tuples per stream. */
-       private final HashMap<String, Integer> numberOfAttributes;
-       /** The wrapped Storm {@link IRichSpout spout}. */
-       protected final IRichSpout spout;
-       /** The wrapper of the given Flink collector. */
-       protected StormSpoutCollector<OUT> collector;
-       /** Indicates, if the source is still running or was canceled. */
-       protected volatile boolean isRunning = true;
-       /** The original Storm topology. */
-       protected StormTopology stormTopology;
-
-       /**
-        * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the 
given Storm {@link IRichSpout spout} such
-        * that it can be used within a Flink streaming program. The output 
type will be one of {@link Tuple0} to
-        * {@link Tuple25} depending on the spout's declared number of 
attributes.
-        *
-        * @param spout
-        *              The Storm {@link IRichSpout spout} to be used.
-        * @throws IllegalArgumentException
-        *              If the number of declared output attributes is not with 
range [0;25].
-        */
-       public AbstractStormSpoutWrapper(final IRichSpout spout) throws 
IllegalArgumentException {
-               this(spout, null);
-       }
-
-       /**
-        * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the 
given Storm {@link IRichSpout spout} such
-        * that it can be used within a Flink streaming program. The output 
type can be any type if parameter
-        * {@code rawOutput} is {@code true} and the spout's number of declared 
output tuples is 1. If {@code rawOutput} is
-        * {@code false} the output type will be one of {@link Tuple0} to 
{@link Tuple25} depending on the spout's declared
-        * number of attributes.
-        * 
-        * @param spout
-        *            The Storm {@link IRichSpout spout} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public AbstractStormSpoutWrapper(final IRichSpout spout,
-                       final Collection<String> rawOutputs)
-                                       throws IllegalArgumentException {
-               this.spout = spout;
-               this.numberOfAttributes = 
StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
-       }
-
-       /**
-        * Sets the original Storm topology.
-        * 
-        * @param stormTopology
-        *            The original Storm topology.
-        */
-       public void setStormTopology(StormTopology stormTopology) {
-               this.stormTopology = stormTopology;
-       }
-
-       @Override
-       public final void run(final SourceContext<OUT> ctx) throws Exception {
-               this.collector = new 
StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
-
-               GlobalJobParameters config = 
super.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
-               StormConfig stormConfig = new StormConfig();
-
-               if (config != null) {
-                       if (config instanceof StormConfig) {
-                               stormConfig = (StormConfig) config;
-                       } else {
-                               stormConfig.putAll(config.toMap());
-                       }
-               }
-
-               this.spout.open(stormConfig,
-                               StormWrapperSetupHelper.createTopologyContext(
-                                       (StreamingRuntimeContext) 
super.getRuntimeContext(),
-                                       this.spout,
-                                       this.stormTopology,
-                                       null),
-                               new SpoutOutputCollector(this.collector));
-               this.spout.activate();
-               this.execute();
-       }
-
-       /**
-        * Needs to be implemented to call the given Spout's {@link 
IRichSpout#nextTuple() nextTuple()} method. This method
-        * might use a {@code while(true)}-loop to emit an infinite number of 
tuples.
-        */
-       protected abstract void execute();
-
-       /**
-        * {@inheritDoc}
-        * <p/>
-        * Sets the {@link #isRunning} flag to {@code false}.
-        */
-       @Override
-       public void cancel() {
-               this.isRunning = false;
-       }
-
-       @Override
-       public void close() throws Exception {
-               this.spout.close();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
deleted file mode 100644
index f499ecc..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} 
that calls the wrapped
- * {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method 
until {@link
- * FiniteStormSpout#reachedEnd()} is true.
- */
-public class FiniteStormSpoutWrapper<OUT> extends 
AbstractStormSpoutWrapper<OUT> {
-       private static final long serialVersionUID = -218340336648247605L;
-
-       private FiniteStormSpout finiteSpout;
-
-       /**
-        * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the 
given Storm {@link FiniteStormSpout spout} such
-        * that it can be used within a Flink streaming program. The output 
type will be one of {@link Tuple0} to
-        * {@link Tuple25} depending on the spout's declared number of 
attributes.
-        * 
-        * @param spout
-        *            The Storm {@link FiniteStormSpout spout} to be used.
-        * @throws IllegalArgumentException
-        *             If the number of declared output attributes is not with 
range [0;25].
-        */
-       public FiniteStormSpoutWrapper(FiniteStormSpout spout)
-                       throws IllegalArgumentException {
-               super(spout);
-               this.finiteSpout = spout;
-       }
-
-       /**
-        * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the 
given Storm {@link FiniteStormSpout spout} such
-        * that it can be used within a Flink streaming program. The output 
type can be any type if parameter
-        * {@code rawOutput} is {@code true} and the spout's number of declared 
output tuples is 1. If {@code rawOutput} is
-        * {@code false} the output type will be one of {@link Tuple0} to 
{@link Tuple25} depending on the spout's declared
-        * number of attributes.
-        * 
-        * @param spout
-        *            The Storm {@link FiniteStormSpout spout} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final 
String[] rawOutputs)
-                       throws IllegalArgumentException {
-               this(spout, Sets.newHashSet(rawOutputs));
-       }
-
-       /**
-        * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the 
given Storm {@link FiniteStormSpout spout} such
-        * that it can be used within a Flink streaming program. The output 
type can be any type if parameter
-        * {@code rawOutput} is {@code true} and the spout's number of declared 
output tuples is 1. If {@code rawOutput} is
-        * {@code false} the output type will be one of {@link Tuple0} to 
{@link Tuple25} depending on the spout's declared
-        * number of attributes.
-        * 
-        * @param spout
-        *            The Storm {@link FiniteStormSpout spout} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final 
Collection<String> rawOutputs)
-                       throws IllegalArgumentException {
-               super(spout, rawOutputs);
-               this.finiteSpout = spout;
-       }
-
-       /**
-        * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link 
FiniteStormSpout#reachedEnd()} is true or
-        * {@link FiniteStormSpout#cancel()} is called.
-        */
-       @Override
-       protected void execute() {
-               while (super.isRunning && !finiteSpout.reachedEnd()) {
-                       finiteSpout.nextTuple();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
deleted file mode 100644
index 3cd27d4..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.HashMap;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * {@link SetupOutputFieldsDeclarer} is used by {@link 
StormWrapperSetupHelper} to determine the output streams and
- * number of attributes declared by the wrapped spout's or bolt's {@code 
declare(...)}/{@code declareStream(...)}
- * method.
- */
-class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
-       /** The number of attributes for each declared stream by the wrapped 
operator. */
-       HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
-
-       @Override
-       public void declare(final Fields fields) {
-               this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
-       }
-
-       @Override
-       public void declare(final boolean direct, final Fields fields) {
-               this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-       }
-
-       @Override
-       public void declareStream(final String streamId, final Fields fields) {
-               this.declareStream(streamId, false, fields);
-       }
-
-       @Override
-       public void declareStream(final String streamId, final boolean direct, 
final Fields fields) {
-               if (streamId == null) {
-                       throw new IllegalArgumentException("Stream ID cannot be 
null.");
-               }
-               if (direct) {
-                       throw new UnsupportedOperationException("Direct emit is 
not supported by Flink");
-               }
-
-               this.outputSchemas.put(streamId, fields.size());
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
deleted file mode 100644
index e810214..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.tuple.Tuple;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.util.Collector;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * A {@link StormBoltCollector} is used by {@link StormBoltWrapper} to 
provided an Storm compatible
- * output collector to the wrapped bolt. It transforms the emitted Storm 
tuples into Flink tuples
- * and emits them via the provide {@link Output} object.
- */
-class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements 
IOutputCollector {
-
-       /** The Flink output Collector */
-       private final Collector<OUT> flinkOutput;
-
-       /**
-        * Instantiates a new {@link StormBoltCollector} that emits Flink 
tuples to the given Flink output object. If the
-        * number of attributes is negative, any output type is supported (ie, 
raw type). If the number of attributes is
-        * between 0 and 25, the output type is {@link Tuple0} to {@link 
Tuple25}, respectively.
-        * 
-        * @param numberOfAttributes
-        *            The number of attributes of the emitted tuples per output 
stream.
-        * @param flinkOutput
-        *            The Flink output object to be used.
-        * @throws UnsupportedOperationException
-        *             if the specified number of attributes is greater than 25
-        */
-       public StormBoltCollector(final HashMap<String, Integer> 
numberOfAttributes,
-                       final Collector<OUT> flinkOutput) throws 
UnsupportedOperationException {
-               super(numberOfAttributes);
-               assert (flinkOutput != null);
-               this.flinkOutput = flinkOutput;
-       }
-
-       @Override
-       protected List<Integer> doEmit(final OUT flinkTuple) {
-               this.flinkOutput.collect(flinkTuple);
-               // TODO
-               return null;
-       }
-
-       @Override
-       public void reportError(final Throwable error) {
-               // not sure, if Flink can support this
-               throw new UnsupportedOperationException("Not implemented yet");
-       }
-
-       @Override
-       public List<Integer> emit(final String streamId, final 
Collection<Tuple> anchors, final List<Object> tuple) {
-               return this.tansformAndEmit(streamId, tuple);
-       }
-
-       @Override
-       public void emitDirect(final int taskId, final String streamId, final 
Collection<Tuple> anchors, final List<Object> tuple) {
-               throw new UnsupportedOperationException("Direct emit is not 
supported by Flink");
-       }
-
-       @Override
-       public void ack(final Tuple input) {
-               throw new UnsupportedOperationException("Currently, 
acking/failing is not supported by Flink");
-       }
-
-       @Override
-       public void fail(final Tuple input) {
-               throw new UnsupportedOperationException("Currently, 
acking/failing is not supported by Flink");
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
deleted file mode 100644
index 715d6df..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-import java.util.HashMap;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute 
the Storm bolt within a Flink Streaming
- * program. It takes the Flink input tuples of type {@code IN} and transforms 
them into {@link StormTuple}s that the
- * bolt can process. Furthermore, it takes the bolt's output tuples and 
transforms them into Flink tuples of type
- * {@code OUT} (see {@link AbstractStormCollector} for supported types).<br />
- * <br />
- * <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts 
that do not use the Storm configuration
- * <code>Map</code> or <code>TopologyContext</code> that is provided by the 
bolt's <code>open(..)</code> method.
- * Furthermore, acking and failing of tuples as well as accessing tuple 
attributes by field names is not supported so
- * far.</strong>
- */
-public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> 
implements OneInputStreamOperator<IN, OUT> {
-       private static final long serialVersionUID = -4788589118464155835L;
-
-       /** The wrapped Storm {@link IRichBolt bolt}. */
-       private final IRichBolt bolt;
-       /** Number of attributes of the bolt's output tuples per stream. */
-       private final HashMap<String, Integer> numberOfAttributes;
-       /** The schema (ie, ordered field names) of the input stream. */
-       private final Fields inputSchema;
-       /** The original Storm topology. */
-       protected StormTopology stormTopology;
-
-       /**
-        *  We have to use this because Operators must output
-        *  {@link 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
-        */
-       private TimestampedCollector<OUT> flinkCollector;
-
-       /**
-        * Instantiates a new {@link StormBoltWrapper} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
-        * used within a Flink streaming program. As no input schema is 
defined, attribute-by-name access in only possible
-        * for POJO input types. The output type will be one of {@link Tuple0} 
to {@link Tuple25} depending on the bolt's
-        * declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
-        * @throws IllegalArgumentException
-        *             If the number of declared output attributes is not with 
range [0;25].
-        */
-       public StormBoltWrapper(final IRichBolt bolt) throws 
IllegalArgumentException {
-               this(bolt, null, (Collection<String>) null);
-       }
-
-       /**
-        * Instantiates a new {@link StormBoltWrapper} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
-        * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
-        * {@link Tuple0} to {@link Tuple25}. The output type will be one of 
{@link Tuple0} to {@link Tuple25} depending on
-        * the bolt's declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
-        * @param inputSchema
-        *            The schema (ie, ordered field names) of the input stream.
-        * @throws IllegalArgumentException
-        *             If the number of declared output attributes is not with 
range [0;25].
-        */
-       public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema)
-                       throws IllegalArgumentException {
-               this(bolt, inputSchema, (Collection<String>) null);
-       }
-
-       /**
-        * Instantiates a new {@link StormBoltWrapper} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
-        * used within a Flink streaming program. As no input schema is 
defined, attribute-by-name access in only possible
-        * for POJO input types. The output type can be any type if parameter 
{@code rawOutput} is {@code true} and the
-        * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
-        * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [1;25].
-        */
-       public StormBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
-                       throws IllegalArgumentException {
-               this(bolt, null, Sets.newHashSet(rawOutputs));
-       }
-
-       /**
-        * Instantiates a new {@link StormBoltWrapper} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
-        * used within a Flink streaming program. As no input schema is 
defined, attribute-by-name access in only possible
-        * for POJO input types. The output type can be any type if parameter 
{@code rawOutput} is {@code true} and the
-        * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
-        * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [1;25].
-        */
-       public StormBoltWrapper(final IRichBolt bolt, final Collection<String> 
rawOutputs)
-                       throws IllegalArgumentException {
-               this(bolt, null, rawOutputs);
-       }
-
-       /**
-        * Instantiates a new {@link StormBoltWrapper} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
-        * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
-        * {@link Tuple0} to {@link Tuple25}. The output type can be any type 
if parameter {@code rawOutput} is {@code true}
-        * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
-        * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
-        * @param inputSchema
-        *            The schema (ie, ordered field names) of the input stream.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
-                       final String[] rawOutputs) throws 
IllegalArgumentException {
-               this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
-       }
-
-       /**
-        * Instantiates a new {@link StormBoltWrapper} that wraps the given 
Storm {@link IRichBolt bolt} such that it can be
-        * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
-        * {@link Tuple0} to {@link Tuple25}. The output type can be any type 
if parameter {@code rawOutput} is {@code true}
-        * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
-        * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
-        * 
-        * @param bolt
-        *            The Storm {@link IRichBolt bolt} to be used.
-        * @param inputSchema
-        *            The schema (ie, ordered field names) of the input stream.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
-                       final Collection<String> rawOutputs) throws 
IllegalArgumentException {
-               this.bolt = bolt;
-               this.inputSchema = inputSchema;
-               this.numberOfAttributes = 
StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
-       }
-
-       /**
-        * Sets the original Storm topology.
-        * 
-        * @param stormTopology
-        *            The original Storm topology.
-        */
-       public void setStormTopology(StormTopology stormTopology) {
-               this.stormTopology = stormTopology;
-       }
-
-       @Override
-       public void open(final Configuration parameters) throws Exception {
-               super.open(parameters);
-
-               final TopologyContext topologyContext = 
StormWrapperSetupHelper.createTopologyContext(
-                               super.runtimeContext, this.bolt, 
this.stormTopology, null);
-               flinkCollector = new TimestampedCollector<OUT>(output);
-               OutputCollector stormCollector = null;
-
-               if (this.numberOfAttributes.size() > 0) {
-                       stormCollector = new OutputCollector(new 
StormBoltCollector<OUT>(
-                                       this.numberOfAttributes, 
flinkCollector));
-               }
-
-               GlobalJobParameters config = 
super.executionConfig.getGlobalJobParameters();
-               StormConfig stormConfig = new StormConfig();
-
-               if (config != null) {
-                       if (config instanceof StormConfig) {
-                               stormConfig = (StormConfig) config;
-                       } else {
-                               stormConfig.putAll(config.toMap());
-                       }
-               }
-
-               this.bolt.prepare(stormConfig, topologyContext, stormCollector);
-       }
-
-       @Override
-       public void dispose() {
-               this.bolt.cleanup();
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void processElement(final StreamRecord<IN> element) throws 
Exception {
-               flinkCollector.setTimestamp(element.getTimestamp());
-               IN value = element.getValue();
-               if (value instanceof SplitStreamType) {
-                       this.bolt.execute(new 
StormTuple<IN>(((SplitStreamType<IN>) value).value,
-                                       inputSchema));
-               } else {
-                       this.bolt.execute(new StormTuple<IN>(value, 
inputSchema));
-               }
-       }
-
-       @Override
-       public void processWatermark(Watermark mark) throws Exception {
-               output.emitWatermark(mark);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
deleted file mode 100644
index 45eb56c..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} 
that calls {@link IRichSpout#nextTuple()
- * nextTuple()} for finite number of times before
- * {@link 
#run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)}
 returns. The number of
- * {@code nextTuple()} calls can be specified as a certain number of 
invocations or can be undefined. In the undefined
- * case, the {@code run(...)} method return if no record was emitted to the 
output collector for the first time.
- */
-public class StormFiniteSpoutWrapper<OUT> extends 
AbstractStormSpoutWrapper<OUT> {
-       private static final long serialVersionUID = 3883246587044801286L;
-
-       /** The number of {@link IRichSpout#nextTuple()} calls */
-       private int numberOfInvocations;
-
-       /**
-        * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the 
{@link IRichSpout#nextTuple() nextTuple()}
-        * method of the given Storm {@link IRichSpout spout} as long as it 
emits records to the output collector. The
-        * output type will be one of {@link Tuple0} to {@link Tuple25} 
depending on the spout's declared number of
-        * attributes.
-        * 
-        * @param spout
-        *            The Storm {@link IRichSpout spout} to be used.
-        * @throws IllegalArgumentException
-        *             If the number of declared output attributes is not with 
range [0;25].
-        */
-       public StormFiniteSpoutWrapper(final IRichSpout spout) throws 
IllegalArgumentException {
-               this(spout, (Collection<String>) null, -1);
-       }
-
-       /**
-        * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the 
{@link IRichSpout#nextTuple() nextTuple()}
-        * method of the given Storm {@link IRichSpout spout} {@code 
numberOfInvocations} times. The output type will be one
-        * of {@link Tuple0} to {@link Tuple25} depending on the spout's 
declared number of attributes.
-        * 
-        * @param spout
-        *            The Storm {@link IRichSpout spout} to be used.
-        * @param numberOfInvocations
-        *            The number of calls to {@link IRichSpout#nextTuple()}.
-        * @throws IllegalArgumentException
-        *             If the number of declared output attributes is not with 
range [0;25].
-        */
-       public StormFiniteSpoutWrapper(final IRichSpout spout, final int 
numberOfInvocations)
-                       throws IllegalArgumentException {
-               this(spout, (Collection<String>) null, numberOfInvocations);
-       }
-
-       /**
-        * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the 
{@link IRichSpout#nextTuple() nextTuple()}
-        * method of the given Storm {@link IRichSpout spout} as long as it 
emits records to the output collector. The
-        * output type can be any type if parameter {@code rawOutput} is {@code 
true} and the spout's number of declared
-        * output tuples is 1. If {@code rawOutput} is {@code false} the output 
type will be one of {@link Tuple0} to
-        * {@link Tuple25} depending on the spout's declared number of 
attributes.
-        * 
-        * @param spout
-        *            The Storm {@link IRichSpout spout} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] 
rawOutputs)
-                       throws IllegalArgumentException {
-               this(spout, Sets.newHashSet(rawOutputs), -1);
-       }
-
-       /**
-        * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the 
{@link IRichSpout#nextTuple() nextTuple()}
-        * method of the given Storm {@link IRichSpout spout} as long as it 
emits records to the output collector. The
-        * output type can be any type if parameter {@code rawOutput} is {@code 
true} and the spout's number of declared
-        * output tuples is 1. If {@code rawOutput} is {@code false} the output 
type will be one of {@link Tuple0} to
-        * {@link Tuple25} depending on the spout's declared number of 
attributes.
-        * 
-        * @param spout
-        *            The Storm {@link IRichSpout spout} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public StormFiniteSpoutWrapper(final IRichSpout spout, final 
Collection<String> rawOutputs)
-                       throws IllegalArgumentException {
-               this(spout, rawOutputs, -1);
-       }
-
-       /**
-        * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the 
{@link IRichSpout#nextTuple() nextTuple()}
-        * method of the given Storm {@link IRichSpout spout} {@code 
numberOfInvocations} times. The output type can be any
-        * type if parameter {@code rawOutput} is {@code true} and the spout's 
number of declared output tuples is 1. If
-        * {@code rawOutput} is {@code false} the output type will be one of 
{@link Tuple0} to {@link Tuple25} depending on
-        * the spout's declared number of attributes.
-        * 
-        * @param spout
-        *            The Storm {@link IRichSpout spout} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @param numberOfInvocations
-        *            The number of calls to {@link IRichSpout#nextTuple()}.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] 
rawOutputs,
-                       final int numberOfInvocations) throws 
IllegalArgumentException {
-               super(spout, Sets.newHashSet(rawOutputs));
-               this.numberOfInvocations = numberOfInvocations;
-       }
-
-       /**
-        * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the 
{@link IRichSpout#nextTuple() nextTuple()}
-        * method of the given Storm {@link IRichSpout spout} {@code 
numberOfInvocations} times. The output type can be any
-        * type if parameter {@code rawOutput} is {@code true} and the spout's 
number of declared output tuples is 1. If
-        * {@code rawOutput} is {@code false} the output type will be one of 
{@link Tuple0} to {@link Tuple25} depending on
-        * the spout's declared number of attributes.
-        * 
-        * @param spout
-        *            The Storm {@link IRichSpout spout} to be used.
-        * @param rawOutputs
-        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
-        *            of a raw type.
-        * @param numberOfInvocations
-        *            The number of calls to {@link IRichSpout#nextTuple()}.
-        * @throws IllegalArgumentException
-        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
-        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
-        *             [0;25].
-        */
-       public StormFiniteSpoutWrapper(final IRichSpout spout, final 
Collection<String> rawOutputs,
-                       final int numberOfInvocations) throws 
IllegalArgumentException {
-               super(spout, rawOutputs);
-               this.numberOfInvocations = numberOfInvocations;
-       }
-
-       /**
-        * Calls {@link IRichSpout#nextTuple()} for the given number of times.
-        */
-       @Override
-       protected void execute() {
-               if (this.numberOfInvocations >= 0) {
-                       while ((--this.numberOfInvocations >= 0) && 
super.isRunning) {
-                               super.spout.nextTuple();
-                       }
-               } else {
-                       do {
-                               super.collector.tupleEmitted = false;
-                               super.spout.nextTuple();
-                       } while (super.collector.tupleEmitted && 
super.isRunning);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
deleted file mode 100644
index 5a20056..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * A {@link StormSpoutCollector} is used by {@link AbstractStormSpoutWrapper} 
to provided an Storm
- * compatible output collector to the wrapped spout. It transforms the emitted 
Storm tuples into
- * Flink tuples and emits them via the provide {@link SourceContext} object.
- */
-class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements 
ISpoutOutputCollector {
-
-       /** The Flink source context object */
-       private final SourceContext<OUT> flinkContext;
-
-       /**
-        * Instantiates a new {@link StormSpoutCollector} that emits Flink 
tuples to the given Flink source context. If the
-        * number of attributes is specified as zero, any output type is 
supported. If the number of attributes is between 0
-        * to 25, the output type is {@link Tuple0} to {@link Tuple25}, 
respectively.
-        * 
-        * @param numberOfAttributes
-        *            The number of attributes of the emitted tuples.
-        * @param flinkContext
-        *            The Flink source context to be used.
-        * @throws UnsupportedOperationException
-        *             if the specified number of attributes is greater than 25
-        */
-       public StormSpoutCollector(final HashMap<String, Integer> 
numberOfAttributes,
-                       final SourceContext<OUT> flinkContext) throws 
UnsupportedOperationException {
-               super(numberOfAttributes);
-               assert (flinkContext != null);
-               this.flinkContext = flinkContext;
-       }
-
-       @Override
-       protected List<Integer> doEmit(final OUT flinkTuple) {
-               this.flinkContext.collect(flinkTuple);
-               // TODO
-               return null;
-       }
-
-       @Override
-       public void reportError(final Throwable error) {
-               // not sure, if Flink can support this
-               throw new UnsupportedOperationException("Not implemented yet");
-       }
-
-       @Override
-       public List<Integer> emit(final String streamId, final List<Object> 
tuple, final Object messageId) {
-               return this.tansformAndEmit(streamId, tuple);
-       }
-
-
-       @Override
-       public void emitDirect(final int taskId, final String streamId, final 
List<Object> tuple, final Object messageId) {
-               throw new UnsupportedOperationException("Direct emit is not 
supported by Flink");
-       }
-
-}

Reply via email to