http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java
new file mode 100644
index 0000000..85d895c
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.storm.util.SplitStreamType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+/**
+ * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
+ */
+abstract class AbstractStormCollector<OUT> {
+
+       /** Flink output tuple of concrete type {@link Tuple0} to {@link 
Tuple25} per output stream. */
+       protected final HashMap<String, Tuple> outputTuple = new 
HashMap<String, Tuple>();
+       /** Flink split tuple. Used, if multiple output streams are declared. */
+       private final SplitStreamType<Object> splitTuple = new 
SplitStreamType<Object>();
+       /**
+        * The number of attributes of the output tuples per stream. 
(Determines the concrete type of {@link #outputTuple}).
+        * If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not 
used and "raw" data type is used.
+        */
+       protected final HashMap<String, Integer> numberOfAttributes;
+       /** Indicates of multiple output stream are declared and thus {@link 
SplitStreamType} must be used as output. */
+       private final boolean split;
+       /** Is set to {@code true} each time a tuple is emitted. */
+       boolean tupleEmitted = false;
+
+       /**
+        * Instantiates a new {@link AbstractStormCollector} that emits Flink 
tuples via {@link #doEmit(Object)}. If the
+        * number of attributes is negative, any output type is supported (ie, 
raw type). If the number of attributes is
+        * between 0 and 25, the output type is {@link Tuple0} to {@link 
Tuple25}, respectively.
+        * 
+        * @param numberOfAttributes
+        *            The number of attributes of the emitted tuples per output 
stream.
+        * @throws UnsupportedOperationException
+        *             if the specified number of attributes is greater than 25
+        */
+       AbstractStormCollector(final HashMap<String, Integer> 
numberOfAttributes)
+                       throws UnsupportedOperationException {
+               assert (numberOfAttributes != null);
+
+               this.numberOfAttributes = numberOfAttributes;
+               this.split = this.numberOfAttributes.size() > 1;
+
+               for (Entry<String, Integer> outputStream : 
numberOfAttributes.entrySet()) {
+                       final int numAtt = outputStream.getValue();
+                       assert (numAtt >= -1);
+
+                       if (numAtt > 25) {
+                               throw new UnsupportedOperationException(
+                                               "Flink cannot handle more then 
25 attributes, but " + numAtt
+                                               + " are declared for stream '" 
+ outputStream.getKey()
+                                               + "' by the given bolt");
+                       } else if (numAtt >= 0) {
+                               try {
+                                       
this.outputTuple.put(outputStream.getKey(),
+                                                       
org.apache.flink.api.java.tuple.Tuple.getTupleClass(numAtt)
+                                                       .newInstance());
+                               } catch (final InstantiationException e) {
+                                       throw new RuntimeException(e);
+                               } catch (final IllegalAccessException e) {
+                                       throw new RuntimeException(e);
+                               }
+
+                       }
+               }
+       }
+
+       /**
+        * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and 
emits this tuple via {@link #doEmit(Object)}
+        * to the specified output stream.
+        * 
+        * @param The
+        *            The output stream id.
+        * @param tuple
+        *            The Storm tuple to be emitted.
+        * @return the return value of {@link #doEmit(Object)}
+        */
+       @SuppressWarnings("unchecked")
+       protected final List<Integer> tansformAndEmit(final String streamId, 
final List<Object> tuple) {
+               List<Integer> taskIds;
+
+               final int numAtt = this.numberOfAttributes.get(streamId);
+               if (numAtt > -1) {
+                       assert (tuple.size() == numAtt);
+                       Tuple out = this.outputTuple.get(streamId);
+                       for (int i = 0; i < numAtt; ++i) {
+                               out.setField(tuple.get(i), i);
+                       }
+                       if (this.split) {
+                               this.splitTuple.streamId = streamId;
+                               this.splitTuple.value = out;
+
+                               taskIds = doEmit((OUT) this.splitTuple);
+                       } else {
+                               taskIds = doEmit((OUT) out);
+                       }
+
+               } else {
+                       assert (tuple.size() == 1);
+                       if (split) {
+                               this.splitTuple.streamId = streamId;
+                               this.splitTuple.value = tuple.get(0);
+
+                               taskIds = doEmit((OUT) this.splitTuple);
+                       } else {
+                               taskIds = doEmit((OUT) tuple.get(0));
+                       }
+               }
+               this.tupleEmitted = true;
+
+               return taskIds;
+       }
+
+       /**
+        * Emits a Flink tuple.
+        * 
+        * @param flinkTuple
+        *              The tuple to be emitted.
+        * @return the IDs of the tasks this tuple was sent to
+        */
+       protected abstract List<Integer> doEmit(OUT flinkTuple);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
new file mode 100644
index 0000000..58fd098
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.tuple.Tuple;
+
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.util.Collector;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A {@link BoltCollector} is used by {@link BoltWrapper} to provided an Storm 
compatible
+ * output collector to the wrapped bolt. It transforms the emitted Storm 
tuples into Flink tuples
+ * and emits them via the provide {@link Output} object.
+ */
+class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements 
IOutputCollector {
+
+       /** The Flink output Collector */
+       private final Collector<OUT> flinkOutput;
+
+       /**
+        * Instantiates a new {@link BoltCollector} that emits Flink tuples to 
the given Flink output object. If the
+        * number of attributes is negative, any output type is supported (ie, 
raw type). If the number of attributes is
+        * between 0 and 25, the output type is {@link Tuple0} to {@link 
Tuple25}, respectively.
+        * 
+        * @param numberOfAttributes
+        *            The number of attributes of the emitted tuples per output 
stream.
+        * @param flinkOutput
+        *            The Flink output object to be used.
+        * @throws UnsupportedOperationException
+        *             if the specified number of attributes is greater than 25
+        */
+       BoltCollector(final HashMap<String, Integer> numberOfAttributes,
+                       final Collector<OUT> flinkOutput) throws 
UnsupportedOperationException {
+               super(numberOfAttributes);
+               assert (flinkOutput != null);
+               this.flinkOutput = flinkOutput;
+       }
+
+       @Override
+       protected List<Integer> doEmit(final OUT flinkTuple) {
+               this.flinkOutput.collect(flinkTuple);
+               // TODO
+               return null;
+       }
+
+       @Override
+       public void reportError(final Throwable error) {
+               // not sure, if Flink can support this
+       }
+
+       @Override
+       public List<Integer> emit(final String streamId, final 
Collection<Tuple> anchors, final List<Object> tuple) {
+               return this.tansformAndEmit(streamId, tuple);
+       }
+
+       @Override
+       public void emitDirect(final int taskId, final String streamId, final 
Collection<Tuple> anchors, final List<Object> tuple) {
+               throw new UnsupportedOperationException("Direct emit is not 
supported by Flink");
+       }
+
+       @Override
+       public void ack(final Tuple input) {}
+
+       @Override
+       public void fail(final Tuple input) {}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
new file mode 100644
index 0000000..b16fc09
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.wrappers;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Sets;
+
+/**
+ * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the 
Storm bolt within a Flink Streaming
+ * program. It takes the Flink input tuples of type {@code IN} and transforms 
them into {@link StormTuple}s that the
+ * bolt can process. Furthermore, it takes the bolt's output tuples and 
transforms them into Flink tuples of type
+ * {@code OUT} (see {@link AbstractStormCollector} for supported types).<br />
+ * <br />
+ * <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts 
that do not use the Storm configuration
+ * <code>Map</code> or <code>TopologyContext</code> that is provided by the 
bolt's <code>open(..)</code> method.
+ * Furthermore, acking and failing of tuples as well as accessing tuple 
attributes by field names is not supported so
+ * far.</strong>
+ */
+public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> 
implements OneInputStreamOperator<IN, OUT> {
+       private static final long serialVersionUID = -4788589118464155835L;
+
+       /** The wrapped Storm {@link IRichBolt bolt}. */
+       private final IRichBolt bolt;
+       /** Number of attributes of the bolt's output tuples per stream. */
+       private final HashMap<String, Integer> numberOfAttributes;
+       /** The schema (ie, ordered field names) of the input stream. */
+       private final Fields inputSchema;
+       /** The original Storm topology. */
+       protected StormTopology stormTopology;
+       /**
+        *  We have to use this because Operators must output
+        *  {@link 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
+        */
+       private TimestampedCollector<OUT> flinkCollector;
+
+       /**
+        * Instantiates a new {@link BoltWrapper} that wraps the given Storm 
{@link IRichBolt bolt} such that it can be
+        * used within a Flink streaming program. As no input schema is 
defined, attribute-by-name access in only possible
+        * for POJO input types. The output type will be one of {@link Tuple0} 
to {@link Tuple25} depending on the bolt's
+        * declared number of attributes.
+        * 
+        * @param bolt
+        *            The Storm {@link IRichBolt bolt} to be used.
+        * @throws IllegalArgumentException
+        *             If the number of declared output attributes is not with 
range [0;25].
+        */
+       public BoltWrapper(final IRichBolt bolt) throws 
IllegalArgumentException {
+               this(bolt, null, (Collection<String>) null);
+       }
+
+       /**
+        * Instantiates a new {@link BoltWrapper} that wraps the given Storm 
{@link IRichBolt bolt} such that it can be
+        * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
+        * {@link Tuple0} to {@link Tuple25}. The output type will be one of 
{@link Tuple0} to {@link Tuple25} depending on
+        * the bolt's declared number of attributes.
+        * 
+        * @param bolt
+        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param inputSchema
+        *            The schema (ie, ordered field names) of the input stream.
+        * @throws IllegalArgumentException
+        *             If the number of declared output attributes is not with 
range [0;25].
+        */
+       public BoltWrapper(final IRichBolt bolt, final Fields inputSchema)
+                       throws IllegalArgumentException {
+               this(bolt, inputSchema, (Collection<String>) null);
+       }
+
+       /**
+        * Instantiates a new {@link BoltWrapper} that wraps the given Storm 
{@link IRichBolt bolt} such that it can be
+        * used within a Flink streaming program. As no input schema is 
defined, attribute-by-name access in only possible
+        * for POJO input types. The output type can be any type if parameter 
{@code rawOutput} is {@code true} and the
+        * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
+        * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
+        * 
+        * @param bolt
+        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type.
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [1;25].
+        */
+       public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
+                       throws IllegalArgumentException {
+               this(bolt, null, Sets.newHashSet(rawOutputs));
+       }
+
+       /**
+        * Instantiates a new {@link BoltWrapper} that wraps the given Storm 
{@link IRichBolt bolt} such that it can be
+        * used within a Flink streaming program. As no input schema is 
defined, attribute-by-name access in only possible
+        * for POJO input types. The output type can be any type if parameter 
{@code rawOutput} is {@code true} and the
+        * bolt's number of declared output tuples is 1. If {@code rawOutput} 
is {@code false} the output type will be one
+        * of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
+        * 
+        * @param bolt
+        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type.
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [1;25].
+        */
+       public BoltWrapper(final IRichBolt bolt, final Collection<String> 
rawOutputs)
+                       throws IllegalArgumentException {
+               this(bolt, null, rawOutputs);
+       }
+
+       /**
+        * Instantiates a new {@link BoltWrapper} that wraps the given Storm 
{@link IRichBolt bolt} such that it can be
+        * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
+        * {@link Tuple0} to {@link Tuple25}. The output type can be any type 
if parameter {@code rawOutput} is {@code true}
+        * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
+        * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
+        * 
+        * @param bolt
+        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param inputSchema
+        *            The schema (ie, ordered field names) of the input stream.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type.
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [0;25].
+        */
+       public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
+                       final String[] rawOutputs) throws 
IllegalArgumentException {
+               this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
+       }
+
+       /**
+        * Instantiates a new {@link BoltWrapper} that wraps the given Storm 
{@link IRichBolt bolt} such that it can be
+        * used within a Flink streaming program. The given input schema enable 
attribute-by-name access for input types
+        * {@link Tuple0} to {@link Tuple25}. The output type can be any type 
if parameter {@code rawOutput} is {@code true}
+        * and the bolt's number of declared output tuples is 1. If {@code 
rawOutput} is {@code false} the output type will
+        * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's 
declared number of attributes.
+        * 
+        * @param bolt
+        *            The Storm {@link IRichBolt bolt} to be used.
+        * @param inputSchema
+        *            The schema (ie, ordered field names) of the input stream.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type.
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [0;25].
+        */
+       public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
+                       final Collection<String> rawOutputs) throws 
IllegalArgumentException {
+               this.bolt = bolt;
+               this.inputSchema = inputSchema;
+               this.numberOfAttributes = 
WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
+       }
+
+       /**
+        * Sets the original Storm topology.
+        * 
+        * @param stormTopology
+        *            The original Storm topology.
+        */
+       public void setStormTopology(StormTopology stormTopology) {
+               this.stormTopology = stormTopology;
+       }
+
+       @Override
+       public void open(final Configuration parameters) throws Exception {
+               super.open(parameters);
+
+               this.flinkCollector = new TimestampedCollector<OUT>(output);
+               OutputCollector stormCollector = null;
+
+               if (this.numberOfAttributes.size() > 0) {
+                       stormCollector = new OutputCollector(new 
BoltCollector<OUT>(
+                                       this.numberOfAttributes, 
flinkCollector));
+               }
+
+               GlobalJobParameters config = 
super.executionConfig.getGlobalJobParameters();
+               StormConfig stormConfig = new StormConfig();
+
+               if (config != null) {
+                       if (config instanceof StormConfig) {
+                               stormConfig = (StormConfig) config;
+                       } else {
+                               stormConfig.putAll(config.toMap());
+                       }
+               }
+
+               final TopologyContext topologyContext = 
WrapperSetupHelper.createTopologyContext(
+                               super.runtimeContext, this.bolt, 
this.stormTopology, stormConfig);
+
+               this.bolt.prepare(stormConfig, topologyContext, stormCollector);
+       }
+
+       @Override
+       public void dispose() {
+               this.bolt.cleanup();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Override
+       public void processElement(final StreamRecord<IN> element) throws 
Exception {
+               this.flinkCollector.setTimestamp(element.getTimestamp());
+               IN value = element.getValue();
+               if (value instanceof SplitStreamType) {
+                       this.bolt.execute(new 
StormTuple<IN>(((SplitStreamType<IN>) value).value,
+                                       inputSchema));
+               } else {
+                       this.bolt.execute(new StormTuple<IN>(value, 
inputSchema));
+               }
+       }
+
+       @Override
+       public void processWatermark(Watermark mark) throws Exception {
+               this.output.emitWatermark(mark);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
new file mode 100644
index 0000000..68368bf
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.hooks.ITaskHook;
+import backtype.storm.metric.api.CombinedMetric;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ReducedMetric;
+import backtype.storm.state.ISubscribedState;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import clojure.lang.Atom;
+
+/**
+ * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites 
certain method that are not applicable when
+ * a Storm topology is executed within Flink.
+ */
+final class FlinkTopologyContext extends TopologyContext {
+
+       /**
+        * Instantiates a new {@link FlinkTopologyContext} for a given Storm 
topology. The context object is instantiated
+        * for each parallel task
+        */
+       FlinkTopologyContext(final StormTopology topology,
+                       @SuppressWarnings("rawtypes") final Map stormConf,
+                       final Map<Integer, String> taskToComponent, final 
Map<String, List<Integer>> componentToSortedTasks,
+                       final Map<String, Map<String, Fields>> 
componentToStreamToFields, final String stormId, final String codeDir,
+                       final String pidDir, final Integer taskId, final 
Integer workerPort, final List<Integer> workerTasks,
+                       final Map<String, Object> defaultResources, final 
Map<String, Object> userResources,
+                       final Map<String, Object> executorData, 
@SuppressWarnings("rawtypes") final Map registeredMetrics,
+                       final Atom openOrPrepareWasCalled) {
+               super(topology, stormConf, taskToComponent, 
componentToSortedTasks, componentToStreamToFields, stormId,
+                               codeDir, pidDir, taskId, workerPort, 
workerTasks, defaultResources, userResources, executorData,
+                               registeredMetrics, openOrPrepareWasCalled);
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public void addTaskHook(final ITaskHook hook) {
+               throw new UnsupportedOperationException("Task hooks are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public Collection<ITaskHook> getHooks() {
+               throw new UnsupportedOperationException("Task hooks are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public IMetric getRegisteredMetricByName(final String name) {
+               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @SuppressWarnings("rawtypes")
+       @Override
+       public CombinedMetric registerMetric(final String name, final ICombiner 
combiner, final int timeBucketSizeInSecs) {
+               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @SuppressWarnings("rawtypes")
+       @Override
+       public ReducedMetric registerMetric(final String name, final IReducer 
combiner, final int timeBucketSizeInSecs) {
+               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @SuppressWarnings("unchecked")
+       @Override
+       public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
+               throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public <T extends ISubscribedState> T setAllSubscribedState(final T 
obj) {
+               throw new UnsupportedOperationException("Not supported by 
Flink");
+
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public <T extends ISubscribedState> T setSubscribedState(final String 
componentId, final T obj) {
+               throw new UnsupportedOperationException("Not supported by 
Flink");
+       }
+
+       /**
+        * Not supported by Flink.
+        *
+        * @throws UnsupportedOperationException
+        *              at every invocation
+        */
+       @Override
+       public <T extends ISubscribedState> T setSubscribedState(final String 
componentId, final String streamId, final T
+                       obj) {
+               throw new UnsupportedOperationException("Not supported by 
Flink");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
new file mode 100644
index 0000000..507305b
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import java.util.HashMap;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+/**
+ * {@link SetupOutputFieldsDeclarer} is used by {@link WrapperSetupHelper} to 
determine the output streams and
+ * number of attributes declared by the wrapped spout's or bolt's {@code 
declare(...)}/{@code declareStream(...)}
+ * method.
+ */
+class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+       /** The declared output streams and schemas. */
+       HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
+       /** The number of attributes for each declared stream by the wrapped 
operator. */
+       HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
+
+       @Override
+       public void declare(final Fields fields) {
+               this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+       }
+
+       @Override
+       public void declare(final boolean direct, final Fields fields) {
+               this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+       }
+
+       @Override
+       public void declareStream(final String streamId, final Fields fields) {
+               this.declareStream(streamId, false, fields);
+       }
+
+       @Override
+       public void declareStream(final String streamId, final boolean direct, 
final Fields fields) {
+               if (streamId == null) {
+                       throw new IllegalArgumentException("Stream ID cannot be 
null.");
+               }
+               if (direct) {
+                       throw new UnsupportedOperationException("Direct emit is 
not supported by Flink");
+               }
+
+               this.outputStreams.put(streamId, fields);
+               this.outputSchemas.put(streamId, fields.size());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
new file mode 100644
index 0000000..91fc090
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.spout.ISpoutOutputCollector;
+
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple25;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A {@link SpoutCollector} is used by {@link AbstractStormSpoutWrapper} to 
provided an Storm
+ * compatible output collector to the wrapped spout. It transforms the emitted 
Storm tuples into
+ * Flink tuples and emits them via the provide {@link SourceContext} object.
+ */
+class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements 
ISpoutOutputCollector {
+
+       /** The Flink source context object */
+       private final SourceContext<OUT> flinkContext;
+
+       /**
+        * Instantiates a new {@link SpoutCollector} that emits Flink tuples to 
the given Flink source context. If the
+        * number of attributes is specified as zero, any output type is 
supported. If the number of attributes is between 0
+        * to 25, the output type is {@link Tuple0} to {@link Tuple25}, 
respectively.
+        * 
+        * @param numberOfAttributes
+        *            The number of attributes of the emitted tuples.
+        * @param flinkContext
+        *            The Flink source context to be used.
+        * @throws UnsupportedOperationException
+        *             if the specified number of attributes is greater than 25
+        */
+       SpoutCollector(final HashMap<String, Integer> numberOfAttributes,
+                       final SourceContext<OUT> flinkContext) throws 
UnsupportedOperationException {
+               super(numberOfAttributes);
+               assert (flinkContext != null);
+               this.flinkContext = flinkContext;
+       }
+
+       @Override
+       protected List<Integer> doEmit(final OUT flinkTuple) {
+               this.flinkContext.collect(flinkTuple);
+               // TODO
+               return null;
+       }
+
+       @Override
+       public void reportError(final Throwable error) {
+               // not sure, if Flink can support this
+       }
+
+       @Override
+       public List<Integer> emit(final String streamId, final List<Object> 
tuple, final Object messageId) {
+               return this.tansformAndEmit(streamId, tuple);
+       }
+
+
+       @Override
+       public void emitDirect(final int taskId, final String streamId, final 
List<Object> tuple, final Object messageId) {
+               throw new UnsupportedOperationException("Direct emit is not 
supported by Flink");
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
new file mode 100644
index 0000000..914a19d
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import java.util.Collection;
+import java.util.HashMap;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.topology.IRichSpout;
+
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+import org.apache.flink.api.java.tuple.Tuple0;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.storm.util.FiniteSpout;
+import org.apache.flink.storm.util.StormConfig;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import com.google.common.collect.Sets;
+
+/**
+ * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it 
within a Flink Streaming program. It
+ * takes the spout's output tuples and transforms them into Flink tuples of 
type {@code OUT} (see
+ * {@link SpoutCollector} for supported types).<br />
+ * <br />
+ * Per default, {@link SpoutWrapper} calls the wrapped spout's {@link 
IRichSpout#nextTuple() nextTuple()} method in
+ * an infinite loop.<br />
+ * Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() 
nextTuple()} for a finite number of
+ * times and terminate automatically afterwards (for finite input streams). 
The number of {@code nextTuple()} calls can
+ * be specified as a certain number of invocations or can be undefined. In the 
undefined case, {@link SpoutWrapper}
+ * terminates if no record was emitted to the output collector for the first 
time during a call to
+ * {@link IRichSpout#nextTuple() nextTuple()}.<br />
+ * If the given spout implements {@link FiniteSpout} interface and {@link 
#numberOfInvocations} is not provided or
+ * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() 
nextTuple()} method until
+ * {@link FiniteSpout#reachedEnd()} returns true.
+ */
+public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
+       private static final long serialVersionUID = -218340336648247605L;
+
+       /** Number of attributes of the spouts's output tuples per stream. */
+       private final HashMap<String, Integer> numberOfAttributes;
+       /** The wrapped {@link IRichSpout spout}. */
+       private final IRichSpout spout;
+       /** The wrapper of the given Flink collector. */
+       private SpoutCollector<OUT> collector;
+       /** Indicates, if the source is still running or was canceled. */
+       private volatile boolean isRunning = true;
+       /** The number of {@link IRichSpout#nextTuple()} calls. */
+       private Integer numberOfInvocations; // do not use int -> null 
indicates an infinite loop
+       /** The original Storm topology. */
+       private StormTopology stormTopology;
+
+       /**
+        * Instantiates a new {@link SpoutWrapper} that calls the {@link 
IRichSpout#nextTuple() nextTuple()} method of
+        * the given {@link IRichSpout spout} in an infinite loop. The output 
type will be one of {@link Tuple0} to
+        * {@link Tuple25} depending on the spout's declared number of 
attributes.
+        * 
+        * @param spout
+        *            The {@link IRichSpout spout} to be used.
+        * @throws IllegalArgumentException
+        *             If the number of declared output attributes is not with 
range [0;25].
+        */
+       public SpoutWrapper(final IRichSpout spout) throws 
IllegalArgumentException {
+               this(spout, (Collection<String>) null, null);
+       }
+
+       /**
+        * Instantiates a new {@link SpoutWrapper} that calls the {@link 
IRichSpout#nextTuple() nextTuple()} method of
+        * the given {@link IRichSpout spout} a finite number of times. The 
output type will be one of {@link Tuple0} to
+        * {@link Tuple25} depending on the spout's declared number of 
attributes.
+        * 
+        * @param spout
+        *            The {@link IRichSpout spout} to be used.
+        * @param numberOfInvocations
+        *            The number of calls to {@link IRichSpout#nextTuple()}. If 
value is negative, {@link SpoutWrapper}
+        *            terminates if no tuple was emitted for the first time. If 
value is {@code null}, finite invocation is
+        *            disabled.
+        * @throws IllegalArgumentException
+        *             If the number of declared output attributes is not with 
range [0;25].
+        */
+       public SpoutWrapper(final IRichSpout spout, final Integer 
numberOfInvocations)
+                       throws IllegalArgumentException {
+               this(spout, (Collection<String>) null, numberOfInvocations);
+       }
+
+       /**
+        * Instantiates a new {@link SpoutWrapper} that calls the {@link 
IRichSpout#nextTuple() nextTuple()} method of
+        * the given {@link IRichSpout spout} in an infinite loop. The output 
type can be any type if parameter
+        * {@code rawOutput} is {@code true} and the spout's number of declared 
output tuples is 1. If {@code rawOutput} is
+        * {@code false} the output type will be one of {@link Tuple0} to 
{@link Tuple25} depending on the spout's declared
+        * number of attributes.
+        * 
+        * @param spout
+        *            The {@link IRichSpout spout} to be used.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type. (Can be {@code null}.)
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [0;25].
+        */
+       public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
+                       throws IllegalArgumentException {
+               this(spout, Sets.newHashSet(rawOutputs), null);
+       }
+
+       /**
+        * Instantiates a new {@link SpoutWrapper} that calls the {@link 
IRichSpout#nextTuple() nextTuple()} method of
+        * the given {@link IRichSpout spout} a finite number of times. The 
output type can be any type if parameter
+        * {@code rawOutput} is {@code true} and the spout's number of declared 
output tuples is 1. If {@code rawOutput} is
+        * {@code false} the output type will be one of {@link Tuple0} to 
{@link Tuple25} depending on the spout's declared
+        * number of attributes.
+        * 
+        * @param spout
+        *            The {@link IRichSpout spout} to be used.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type. (Can be {@code null}.)
+        * @param numberOfInvocations
+        *            The number of calls to {@link IRichSpout#nextTuple()}. If 
value is negative, {@link SpoutWrapper}
+        *            terminates if no tuple was emitted for the first time. If 
value is {@code null}, finite invocation is
+        *            disabled.
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [0;25].
+        */
+       public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
+                       final Integer numberOfInvocations) throws 
IllegalArgumentException {
+               this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations);
+       }
+
+       /**
+        * Instantiates a new {@link SpoutWrapper} that calls the {@link 
IRichSpout#nextTuple() nextTuple()} method of
+        * the given {@link IRichSpout spout} in an infinite loop. The output 
type can be any type if parameter
+        * {@code rawOutput} is {@code true} and the spout's number of declared 
output tuples is 1. If {@code rawOutput} is
+        * {@code false} the output type will be one of {@link Tuple0} to 
{@link Tuple25} depending on the spout's declared
+        * number of attributes.
+        * 
+        * @param spout
+        *            The {@link IRichSpout spout} to be used.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type. (Can be {@code null}.)
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [0;25].
+        */
+       public SpoutWrapper(final IRichSpout spout, final Collection<String> 
rawOutputs)
+                       throws IllegalArgumentException {
+               this(spout, rawOutputs, null);
+       }
+
+       /**
+        * Instantiates a new {@link SpoutWrapper} that calls the {@link 
IRichSpout#nextTuple() nextTuple()} method of
+        * the given {@link IRichSpout spout} a finite number of times. The 
output type can be any type if parameter
+        * {@code rawOutput} is {@code true} and the spout's number of declared 
output tuples is 1. If {@code rawOutput} is
+        * {@code false} the output type will be one of {@link Tuple0} to 
{@link Tuple25} depending on the spout's declared
+        * number of attributes.
+        * 
+        * @param spout
+        *            The {@link IRichSpout spout} to be used.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type {@link Tuple1} but be
+        *            of a raw type. (Can be {@code null}.)
+        * @param numberOfInvocations
+        *            The number of calls to {@link IRichSpout#nextTuple()}. If 
value is negative, {@link SpoutWrapper}
+        *            terminates if no tuple was emitted for the first time. If 
value is {@code null}, finite invocation is
+        *            disabled.
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [0;25].
+        */
+       public SpoutWrapper(final IRichSpout spout, final Collection<String> 
rawOutputs,
+                       final Integer numberOfInvocations) throws 
IllegalArgumentException {
+               this.spout = spout;
+               this.numberOfAttributes = 
WrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
+               this.numberOfInvocations = numberOfInvocations;
+       }
+
+       /**
+        * Sets the original Storm topology.
+        * 
+        * @param stormTopology
+        *            The original Storm topology.
+        */
+       public void setStormTopology(StormTopology stormTopology) {
+               this.stormTopology = stormTopology;
+       }
+
+       @Override
+       public final void run(final SourceContext<OUT> ctx) throws Exception {
+               this.collector = new 
SpoutCollector<OUT>(this.numberOfAttributes, ctx);
+
+               GlobalJobParameters config = 
super.getRuntimeContext().getExecutionConfig()
+                               .getGlobalJobParameters();
+               StormConfig stormConfig = new StormConfig();
+
+               if (config != null) {
+                       if (config instanceof StormConfig) {
+                               stormConfig = (StormConfig) config;
+                       } else {
+                               stormConfig.putAll(config.toMap());
+                       }
+               }
+
+               this.spout.open(stormConfig, 
WrapperSetupHelper.createTopologyContext(
+                               (StreamingRuntimeContext) 
super.getRuntimeContext(), this.spout,
+                               this.stormTopology, stormConfig), new 
SpoutOutputCollector(this.collector));
+               this.spout.activate();
+
+               if (numberOfInvocations == null) {
+                       if (this.spout instanceof FiniteSpout) {
+                               final FiniteSpout finiteSpout = (FiniteSpout) 
this.spout;
+
+                               while (this.isRunning && 
!finiteSpout.reachedEnd()) {
+                                       finiteSpout.nextTuple();
+                               }
+                       } else {
+                               while (this.isRunning) {
+                                       this.spout.nextTuple();
+                               }
+                       }
+               } else {
+                       int counter = this.numberOfInvocations;
+                       if (counter >= 0) {
+                               while ((--counter >= 0) && this.isRunning) {
+                                       this.spout.nextTuple();
+                               }
+                       } else {
+                               do {
+                                       this.collector.tupleEmitted = false;
+                                       this.spout.nextTuple();
+                               } while (this.collector.tupleEmitted && 
this.isRunning);
+                       }
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        * <p/>
+        * Sets the {@link #isRunning} flag to {@code false}.
+        */
+       @Override
+       public void cancel() {
+               this.isRunning = false;
+       }
+
+       @Override
+       public void close() throws Exception {
+               this.spout.close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
new file mode 100644
index 0000000..c9ab8e5
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.wrappers;
+
+/*
+ * We do neither import
+ *             backtype.storm.tuple.Tuple;
+ * nor
+ *             org.apache.flink.api.java.tuple.Tuple
+ * to avoid confusion
+ */
+
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.MessageId;
+import backtype.storm.tuple.Values;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm 
tuple.
+ */
+class StormTuple<IN> implements backtype.storm.tuple.Tuple {
+
+       /** The Storm representation of the original Flink tuple */
+       private final Values stormTuple;
+       /** The schema (ie, ordered field names) of the tuple */
+       private final Fields schema;
+
+       /**
+        * Create a new Storm tuple from the given Flink tuple. The provided 
{@code nameIndexMap} is ignored for raw input
+        * types.
+        * 
+        * @param flinkTuple
+        *              The Flink tuple to be converted.
+        * @param schema
+        *              The schema (ie, ordered field names) of the tuple.
+        */
+       StormTuple(final IN flinkTuple, final Fields schema) {
+               if (flinkTuple instanceof 
org.apache.flink.api.java.tuple.Tuple) {
+                       this.schema = schema;
+                       final org.apache.flink.api.java.tuple.Tuple t = 
(org.apache.flink.api.java.tuple.Tuple) flinkTuple;
+
+                       final int numberOfAttributes = t.getArity();
+                       this.stormTuple = new Values();
+                       for (int i = 0; i < numberOfAttributes; ++i) {
+                               this.stormTuple.add(t.getField(i));
+                       }
+               } else {
+                       this.schema = null;
+                       this.stormTuple = new Values(flinkTuple);
+               }
+       }
+
+       @Override
+       public int size() {
+               return this.stormTuple.size();
+       }
+
+       @Override
+       public boolean contains(final String field) {
+               if (this.schema != null) {
+                       return this.schema.contains(field);
+               }
+
+               try {
+                       this.getPublicMemberField(field);
+                       return true;
+               } catch (NoSuchFieldException f) {
+                       try {
+                               this.getGetterMethod(field);
+                               return true;
+                       } catch (Exception g) {
+                               return false;
+                       }
+               } catch (Exception e) {
+                       return false;
+               }
+       }
+
+       @Override
+       public Fields getFields() {
+               return this.schema;
+       }
+
+       @Override
+       public int fieldIndex(final String field) {
+               return this.schema.fieldIndex(field);
+       }
+
+       @Override
+       public List<Object> select(final Fields selector) {
+               return this.schema.select(selector, this.stormTuple);
+       }
+
+       @Override
+       public Object getValue(final int i) {
+               return this.stormTuple.get(i);
+       }
+
+       @Override
+       public String getString(final int i) {
+               return (String) this.stormTuple.get(i);
+       }
+
+       @Override
+       public Integer getInteger(final int i) {
+               return (Integer) this.stormTuple.get(i);
+       }
+
+       @Override
+       public Long getLong(final int i) {
+               return (Long) this.stormTuple.get(i);
+       }
+
+       @Override
+       public Boolean getBoolean(final int i) {
+               return (Boolean) this.stormTuple.get(i);
+       }
+
+       @Override
+       public Short getShort(final int i) {
+               return (Short) this.stormTuple.get(i);
+       }
+
+       @Override
+       public Byte getByte(final int i) {
+               return (Byte) this.stormTuple.get(i);
+       }
+
+       @Override
+       public Double getDouble(final int i) {
+               return (Double) this.stormTuple.get(i);
+       }
+
+       @Override
+       public Float getFloat(final int i) {
+               return (Float) this.stormTuple.get(i);
+       }
+
+       @Override
+       public byte[] getBinary(final int i) {
+               return (byte[]) this.stormTuple.get(i);
+       }
+
+       private Field getPublicMemberField(final String field) throws Exception 
{
+               assert (this.stormTuple.size() == 1);
+               return this.stormTuple.get(0).getClass().getField(field);
+       }
+
+       private Method getGetterMethod(final String field) throws Exception {
+               assert (this.stormTuple.size() == 1);
+               return this.stormTuple
+                               .get(0)
+                               .getClass()
+                               .getMethod("get" + 
Character.toUpperCase(field.charAt(0)) + field.substring(1),
+                                               (Class<?>[]) null);
+       }
+
+       private Object getValueByPublicMember(final String field) throws 
Exception {
+               assert (this.stormTuple.size() == 1);
+               return getPublicMemberField(field).get(this.stormTuple.get(0));
+       }
+
+       private Object getValueByGetter(final String field) throws Exception {
+               assert (this.stormTuple.size() == 1);
+               return getGetterMethod(field).invoke(this.stormTuple.get(0), 
(Object[]) null);
+       }
+
+       @SuppressWarnings("unchecked")
+       public <T> T getValueByName(final String field) {
+               if (this.schema != null) {
+                       return (T) this.getValue(this.schema.fieldIndex(field));
+               }
+               assert (this.stormTuple.size() == 1);
+
+               Exception e;
+               try {
+                       // try public member
+                       return (T) getValueByPublicMember(field);
+               } catch (NoSuchFieldException f) {
+                       try {
+                               // try getter-method
+                               return (T) getValueByGetter(field);
+                       } catch (Exception g) {
+                               e = g;
+                       }
+               } catch (Exception f) {
+                       e = f;
+               }
+
+               throw new RuntimeException("Could not access field <" + field + 
">", e);
+       }
+
+       @Override
+       public Object getValueByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public String getStringByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public Integer getIntegerByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public Long getLongByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public Boolean getBooleanByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public Short getShortByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public Byte getByteByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public Double getDoubleByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public Float getFloatByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public byte[] getBinaryByField(final String field) {
+               return getValueByName(field);
+       }
+
+       @Override
+       public List<Object> getValues() {
+               return this.stormTuple;
+       }
+
+       @Override
+       public GlobalStreamId getSourceGlobalStreamid() {
+               // not sure if Flink can support this
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String getSourceComponent() {
+               // not sure if Flink can support this
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int getSourceTask() {
+               // not sure if Flink can support this
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public String getSourceStreamId() {
+               // not sure if Flink can support this
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public MessageId getMessageId() {
+               // not sure if Flink can support this
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public int hashCode() {
+               final int prime = 31;
+               int result = 1;
+               result = (prime * result) + ((this.stormTuple == null) ? 0 : 
this.stormTuple.hashCode());
+               return result;
+       }
+
+       @Override
+       public boolean equals(final Object obj) {
+               if (this == obj) {
+                       return true;
+               }
+               if (obj == null) {
+                       return false;
+               }
+               if (this.getClass() != obj.getClass()) {
+                       return false;
+               }
+               final StormTuple<?> other = (StormTuple<?>) obj;
+               if (this.stormTuple == null) {
+                       if (other.stormTuple != null) {
+                               return false;
+                       }
+               } else if (!this.stormTuple.equals(other.stormTuple)) {
+                       return false;
+               }
+               return true;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
new file mode 100644
index 0000000..d529b6a
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.Config;
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.StreamInfo;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IComponent;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
+import clojure.lang.Atom;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * {@link WrapperSetupHelper} is an helper class used by {@link 
AbstractStormSpoutWrapper} and
+ * {@link BoltWrapper}.
+ */
+class WrapperSetupHelper {
+
+       /** The configuration key for the topology name. */
+       final static String TOPOLOGY_NAME = "storm.topology.name";
+
+       /**
+        * Computes the number of output attributes used by a {@link 
AbstractStormSpoutWrapper} or {@link BoltWrapper}
+        * per declared output stream. The number is {@code -1} for raw output 
type or a value within range [0;25] for
+        * output type {@link org.apache.flink.api.java.tuple.Tuple0 Tuple0} to
+        * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25}.
+        * 
+        * @param spoutOrBolt
+        *            The Storm {@link IRichSpout spout} or {@link IRichBolt 
bolt} to be used.
+        * @param rawOutputs
+        *            Contains stream names if a single attribute output 
stream, should not be of type
+        *            {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but 
be of a raw type. (Can be {@code null}.)
+        * @return The number of attributes to be used for each stream.
+        * @throws IllegalArgumentException
+        *             If {@code rawOuput} is {@code true} and the number of 
declared output attributes is not 1 or if
+        *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
+        *             [0;25].
+        */
+       static HashMap<String, Integer> getNumberOfAttributes(final IComponent 
spoutOrBolt,
+                       final Collection<String> rawOutputs)
+                                       throws IllegalArgumentException {
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               spoutOrBolt.declareOutputFields(declarer);
+
+               for (Entry<String, Integer> schema : 
declarer.outputSchemas.entrySet()) {
+                       int declaredNumberOfAttributes = schema.getValue();
+                       if ((declaredNumberOfAttributes < 0) || 
(declaredNumberOfAttributes > 25)) {
+                               throw new IllegalArgumentException(
+                                               "Provided bolt declares non 
supported number of output attributes. Must be in range [0;25] but "
+                                                               + "was " + 
declaredNumberOfAttributes);
+                       }
+
+                       if (rawOutputs != null && 
rawOutputs.contains(schema.getKey())) {
+                               if (declaredNumberOfAttributes != 1) {
+                                       throw new IllegalArgumentException(
+                                                       "Ouput type is 
requested to be raw type, but provided bolt declares more then one output "
+                                                                       + 
"attribute.");
+                               }
+                               schema.setValue(-1);
+                       }
+               }
+
+               return declarer.outputSchemas;
+       }
+
+       /** Used to computed unique task IDs for a Storm topology. */
+       private static int tid;
+
+       /**
+        * Creates a {@link TopologyContext} for a Spout or Bolt instance (ie, 
Flink task / Storm executor).
+        * 
+        * @param context
+        *            The Flink runtime context.
+        * @param spoutOrBolt
+        *            The Spout or Bolt this context is created for.
+        * @param stormTopology
+        *            The original Storm topology.
+        * @param stormConfig
+        *            The user provided configuration.
+        * @return The created {@link TopologyContext}.
+        */
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       static synchronized TopologyContext createTopologyContext(
+                       final StreamingRuntimeContext context, final IComponent 
spoutOrBolt,
+                       StormTopology stormTopology, Map stormConfig) {
+               String operatorName = context.getTaskName();
+               if (operatorName.startsWith("Source: ")) {
+                       // prefix "Source: " is inserted by Flink sources by 
default -- need to get rid of it here
+                       operatorName = operatorName.substring(8);
+               }
+               final int dop = context.getNumberOfParallelSubtasks();
+
+               final Map<Integer, String> taskToComponents = new 
HashMap<Integer, String>();
+               final Map<String, List<Integer>> componentToSortedTasks = new 
HashMap<String, List<Integer>>();
+               final Map<String, Map<String, Fields>> 
componentToStreamToFields = new HashMap<String, Map<String, Fields>>();
+               String stormId = (String) stormConfig.get(TOPOLOGY_NAME);
+               String codeDir = null; // not supported
+               String pidDir = null; // not supported
+               Integer taskId = null;
+               Integer workerPort = null; // not supported
+               List<Integer> workerTasks = new ArrayList<Integer>();
+               final Map<String, Object> defaultResources = new 
HashMap<String, Object>();
+               final Map<String, Object> userResources = new HashMap<String, 
Object>();
+               final Map<String, Object> executorData = new HashMap<String, 
Object>();
+               final Map registeredMetrics = new HashMap();
+               Atom openOrPrepareWasCalled = null;
+
+               if (stormTopology == null) {
+                       // embedded mode
+                       ComponentCommon common = new ComponentCommon();
+                       common.set_parallelism_hint(dop);
+
+                       HashMap<String, SpoutSpec> spouts = new HashMap<String, 
SpoutSpec>();
+                       HashMap<String, Bolt> bolts = new HashMap<String, 
Bolt>();
+                       if (spoutOrBolt instanceof IRichSpout) {
+                               spouts.put(operatorName, new SpoutSpec(null, 
common));
+                       } else {
+                               assert (spoutOrBolt instanceof IRichBolt);
+                               bolts.put(operatorName, new Bolt(null, common));
+                       }
+                       stormTopology = new StormTopology(spouts, bolts, new 
HashMap<String, StateSpoutSpec>());
+
+                       taskId = context.getIndexOfThisSubtask();
+
+                       List<Integer> sortedTasks = new ArrayList<Integer>(dop);
+                       for (int i = 1; i <= dop; ++i) {
+                               taskToComponents.put(i, operatorName);
+                               sortedTasks.add(i);
+                       }
+                       componentToSortedTasks.put(operatorName, sortedTasks);
+
+                       SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+                       spoutOrBolt.declareOutputFields(declarer);
+                       componentToStreamToFields.put(operatorName, 
declarer.outputStreams);
+               } else {
+                       // whole topology is built (ie, FlinkTopologyBuilder is 
used)
+                       Map<String, SpoutSpec> spouts = 
stormTopology.get_spouts();
+                       Map<String, Bolt> bolts = stormTopology.get_bolts();
+                       Map<String, StateSpoutSpec> stateSpouts = 
stormTopology.get_state_spouts();
+
+                       tid = 1;
+
+                       for (Entry<String, SpoutSpec> spout : 
spouts.entrySet()) {
+                               Integer rc = 
processSingleOperator(spout.getKey(), spout.getValue().get_common(),
+                                               operatorName, 
context.getIndexOfThisSubtask(), dop, taskToComponents,
+                                               componentToSortedTasks, 
componentToStreamToFields);
+                               if (rc != null) {
+                                       taskId = rc;
+                               }
+                       }
+                       for (Entry<String, Bolt> bolt : bolts.entrySet()) {
+                               Integer rc = 
processSingleOperator(bolt.getKey(), bolt.getValue().get_common(),
+                                               operatorName, 
context.getIndexOfThisSubtask(), dop, taskToComponents,
+                                               componentToSortedTasks, 
componentToStreamToFields);
+                               if (rc != null) {
+                                       taskId = rc;
+                               }
+                       }
+                       for (Entry<String, StateSpoutSpec> stateSpout : 
stateSpouts.entrySet()) {
+                               Integer rc = taskId = 
processSingleOperator(stateSpout.getKey(), stateSpout
+                                               .getValue().get_common(), 
operatorName, context.getIndexOfThisSubtask(),
+                                               dop, taskToComponents, 
componentToSortedTasks, componentToStreamToFields);
+                               if (rc != null) {
+                                       taskId = rc;
+                               }
+                       }
+                       assert (taskId != null);
+               }
+
+               if 
(!stormConfig.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
+                       stormConfig.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 
30); // Storm default value
+               }
+
+               return new FlinkTopologyContext(stormTopology, stormConfig, 
taskToComponents,
+                               componentToSortedTasks, 
componentToStreamToFields, stormId, codeDir, pidDir,
+                               taskId, workerPort, workerTasks, 
defaultResources, userResources, executorData,
+                               registeredMetrics, openOrPrepareWasCalled);
+       }
+
+       /**
+        * Sets up {@code taskToComponents}, {@code componentToSortedTasks}, 
and {@code componentToStreamToFields} for a
+        * single instance of a Spout or Bolt (ie, task or executor). 
Furthermore, is computes the unique task-id.
+        * 
+        * @param componentId
+        *            The ID of the Spout/Bolt in the topology.
+        * @param common
+        *            The common operator object (that is all Spouts and Bolts 
have).
+        * @param operatorName
+        *            The Flink operator name.
+        * @param index
+        *            The index of the currently processed tasks with its 
operator.
+        * @param dop
+        *            The parallelism of the operator.
+        * @param taskToComponents
+        *            OUTPUT: A map from all task IDs of the topology to their 
component IDs.
+        * @param componentToSortedTasks
+        *            OUTPUT: A map from all component IDs to their sorted list 
of corresponding task IDs.
+        * @param componentToStreamToFields
+        *            OUTPUT: A map from all component IDs to there output 
streams and output fields.
+        * 
+        * @return A unique task ID if the currently processed Spout or Bolt 
({@code componentId}) is equal to the current
+        *         Flink operator ({@link operatorName}) -- {@code null} 
otherwise.
+        */
+       private static Integer processSingleOperator(final String componentId,
+                       final ComponentCommon common, final String 
operatorName, final int index,
+                       final int dop, final Map<Integer, String> 
taskToComponents,
+                       final Map<String, List<Integer>> componentToSortedTasks,
+                       final Map<String, Map<String, Fields>> 
componentToStreamToFields) {
+               final int parallelism_hint = common.get_parallelism_hint();
+               Integer taskId = null;
+
+               if (componentId.equals(operatorName)) {
+                       taskId = tid + index;
+               }
+
+               List<Integer> sortedTasks = new ArrayList<Integer>(dop);
+               for (int i = 0; i < parallelism_hint; ++i) {
+                       taskToComponents.put(tid, componentId);
+                       sortedTasks.add(tid);
+                       ++tid;
+               }
+               componentToSortedTasks.put(componentId, sortedTasks);
+
+               if (componentId.equals(operatorName)) {
+               }
+
+               Map<String, Fields> outputStreams = new HashMap<String, 
Fields>();
+               for(Entry<String, StreamInfo> outStream : 
common.get_streams().entrySet()) {
+                       outputStreams.put(outStream.getKey(), new 
Fields(outStream.getValue().get_output_fields()));
+               }
+               componentToStreamToFields.put(componentId, outputStreams);
+
+               return taskId;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..49de476
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.storm.api.FlinkOutputFieldsDeclarer;
+import org.apache.flink.storm.util.AbstractTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.LinkedList;
+
+public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
+
+
+
+       @Test
+       public void testNull() {
+               Assert.assertNull(new 
FlinkOutputFieldsDeclarer().getOutputType(null));
+       }
+
+       @Test
+       public void testDeclare() {
+               for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
+                       for (int j = 1; j < 2; ++j) { // number of streams
+                               for (int k = 0; k <= 25; ++k) { // number of 
attributes
+                                       this.runDeclareTest(i, j, k);
+                               }
+                       }
+               }
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareSimpleToManyAttributes() {
+               this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareNonDirectToManyAttributes() {
+               this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareDefaultStreamToManyAttributes() {
+               this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareFullToManyAttributes() {
+               this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
+       }
+
+       private void runDeclareTest(final int testCase, final int 
numberOfStreams,
+                       final int numberOfAttributes) {
+               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+
+               String[] streams = null;
+               if (numberOfStreams > 1 || r.nextBoolean()) {
+                       streams = new String[numberOfStreams];
+                       for (int i = 0; i < numberOfStreams; ++i) {
+                               streams[i] = "stream" + i;
+                       }
+               }
+
+               final String[] attributes = new String[numberOfAttributes];
+               for (int i = 0; i < attributes.length; ++i) {
+                       attributes[i] = "a" + i;
+               }
+
+               switch (testCase) {
+               case 0:
+                       this.declareSimple(declarer, streams, attributes);
+                       break;
+               default:
+                       this.declareNonDirect(declarer, streams, attributes);
+               }
+
+               if (streams == null) {
+                       streams = new String[] { Utils.DEFAULT_STREAM_ID };
+               }
+
+               for (String stream : streams) {
+                       final TypeInformation<?> type = 
declarer.getOutputType(stream);
+
+                       if (numberOfAttributes == 1) {
+                               Assert.assertEquals(type.getClass(), 
GenericTypeInfo.class);
+                               Assert.assertEquals(type.getTypeClass(), 
Object.class);
+                       } else {
+                               Assert.assertEquals(numberOfAttributes, 
type.getArity());
+                               Assert.assertTrue(type.isTupleType());
+                       }
+               }
+       }
+
+       private void declareSimple(final FlinkOutputFieldsDeclarer declarer, 
final String[] streams,
+                       final String[] attributes) {
+
+               if (streams != null) {
+                       for (String stream : streams) {
+                               declarer.declareStream(stream, new 
Fields(attributes));
+                       }
+               } else {
+                       declarer.declare(new Fields(attributes));
+               }
+       }
+
+       private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, 
final String[] streams,
+                       final String[] attributes) {
+
+               if (streams != null) {
+                       for (String stream : streams) {
+                               declarer.declareStream(stream, false, new 
Fields(attributes));
+                       }
+               } else {
+                       declarer.declare(false, new Fields(attributes));
+               }
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testUndeclared() {
+               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+               declarer.getOutputType("unknownStreamId");
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testDeclareDirect() {
+               new FlinkOutputFieldsDeclarer().declare(true, null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testDeclareDirect2() {
+               new 
FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
+       }
+
+       @Test
+       public void testGetGroupingFieldIndexes() {
+               final int numberOfAttributes = 5 + this.r.nextInt(21);
+               final String[] attributes = new String[numberOfAttributes];
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       attributes[i] = "a" + i;
+               }
+
+               final FlinkOutputFieldsDeclarer declarer = new 
FlinkOutputFieldsDeclarer();
+               declarer.declare(new Fields(attributes));
+
+               final int numberOfKeys = 1 + this.r.nextInt(25);
+               final LinkedList<String> groupingFields = new 
LinkedList<String>();
+               final boolean[] indexes = new boolean[numberOfAttributes];
+
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       if (this.r.nextInt(26) < numberOfKeys) {
+                               groupingFields.add(attributes[i]);
+                               indexes[i] = true;
+                       } else {
+                               indexes[i] = false;
+                       }
+               }
+
+               final int[] expectedResult = new int[groupingFields.size()];
+               int j = 0;
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       if (indexes[i]) {
+                               expectedResult[j++] = i;
+                       }
+               }
+
+               final int[] result = 
declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
+                               groupingFields);
+
+               Assert.assertEquals(expectedResult.length, result.length);
+               for (int i = 0; i < expectedResult.length; ++i) {
+                       Assert.assertEquals(expectedResult[i], result[i]);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
new file mode 100644
index 0000000..e6fb8e5
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyBuilderTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.util.TestSink;
+import org.junit.Test;
+
+import backtype.storm.tuple.Fields;
+
+public class FlinkTopologyBuilderTest {
+
+       @Test(expected = RuntimeException.class)
+       public void testUnknowSpout() {
+               FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+               builder.setSpout("spout", new TestSpout());
+               builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("unknown");
+               builder.createTopology();
+       }
+
+       @Test(expected = RuntimeException.class)
+       public void testUnknowBolt() {
+               FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+               builder.setSpout("spout", new TestSpout());
+               builder.setBolt("bolt1", new 
TestBolt()).shuffleGrouping("spout");
+               builder.setBolt("bolt2", new 
TestBolt()).shuffleGrouping("unknown");
+               builder.createTopology();
+       }
+
+       @Test(expected = RuntimeException.class)
+       public void testUndeclaredStream() {
+               FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+               builder.setSpout("spout", new TestSpout());
+               builder.setBolt("bolt", new 
TestBolt()).shuffleGrouping("spout");
+               builder.createTopology();
+       }
+
+       @Test
+       public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
+               FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
+
+               flinkBuilder.setSpout("spout", new TestDummySpout());
+               flinkBuilder.setBolt("sink", new 
TestSink()).fieldsGrouping("spout",
+                               TestDummySpout.spoutStreamId, new Fields("id"));
+
+               flinkBuilder.createTopology();
+       }
+
+       @Test
+       public void testFieldsGroupingOnMultipleBoltOutputStreams() {
+               FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
+
+               flinkBuilder.setSpout("spout", new TestDummySpout());
+               flinkBuilder.setBolt("bolt", new 
TestDummyBolt()).shuffleGrouping("spout");
+               flinkBuilder.setBolt("sink", new 
TestSink()).fieldsGrouping("bolt",
+                               TestDummyBolt.groupingStreamId, new 
Fields("id"));
+
+               flinkBuilder.createTopology();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
new file mode 100644
index 0000000..9d04ca5
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import org.apache.flink.storm.api.FlinkTopology;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlinkTopologyTest {
+
+       @Test
+       public void testDefaultParallelism() {
+               final FlinkTopology topology = new FlinkTopology();
+               Assert.assertEquals(1, topology.getParallelism());
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testExecute() throws Exception {
+               new FlinkTopology().execute();
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testExecuteWithName() throws Exception {
+               new FlinkTopology().execute(null);
+       }
+
+       @Test
+       public void testNumberOfTasks() {
+               final FlinkTopology topology = new FlinkTopology();
+
+               Assert.assertEquals(0, topology.getNumberOfTasks());
+
+               topology.increaseNumberOfTasks(3);
+               Assert.assertEquals(3, topology.getNumberOfTasks());
+
+               topology.increaseNumberOfTasks(2);
+               Assert.assertEquals(5, topology.getNumberOfTasks());
+
+               topology.increaseNumberOfTasks(8);
+               Assert.assertEquals(13, topology.getNumberOfTasks());
+       }
+
+       @Test(expected = AssertionError.class)
+       public void testAssert() {
+               new FlinkTopology().increaseNumberOfTasks(0);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
new file mode 100644
index 0000000..74ea67e
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+public class TestBolt implements IRichBolt {
+       private static final long serialVersionUID = -667148827441397683L;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {}
+
+       @Override
+       public void execute(Tuple input) {}
+
+       @Override
+       public void cleanup() {}
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
new file mode 100644
index 0000000..4abb604
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+public class TestSpout implements IRichSpout {
+       private static final long serialVersionUID = -4884029383198924007L;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {}
+
+       @Override
+       public void close() {}
+
+       @Override
+       public void activate() {}
+
+       @Override
+       public void deactivate() {}
+
+       @Override
+       public void nextTuple() {}
+
+       @Override
+       public void ack(Object msgId) {}
+
+       @Override
+       public void fail(Object msgId) {}
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java
new file mode 100644
index 0000000..7bea94c
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestTopologyBuilder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
+import backtype.storm.generated.StormTopology;
+
+public class TestTopologyBuilder extends FlinkTopologyBuilder {
+       @Override
+       public StormTopology getStormTopology() {
+               return super.getStormTopology();
+       }
+}

Reply via email to