http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
new file mode 100644
index 0000000..71a5e8d
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and 
submitted to Flink for execution in the
+ * same way as to a Storm {@link LocalCluster}. In contrast to {@link 
WordCountLocal} all bolts access the field of
+ * input tuples by name instead of index.
+ * <p/>
+ * This example shows how to run program directly within Java, thus it cannot 
be used to submit a {@link StormTopology}
+ * via Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCountLocalByName &lt;text path&gt; &lt;result 
path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink
+ * </ul>
+ */
+public class WordCountLocalByName {
+       public final static String topologyId = "Storm WordCountName";
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(final String[] args) throws Exception {
+
+               if (!WordCountTopology.parseParameters(args)) {
+                       return;
+               }
+
+               // build Topology the Storm way
+               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology(false);
+
+               // execute program locally
+               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
+               cluster.submitTopology(topologyId, null, 
builder.createTopology());
+
+               Utils.sleep(10 * 1000);
+
+               // TODO kill does no do anything so far
+               cluster.killTopology(topologyId);
+               cluster.shutdown();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
new file mode 100644
index 0000000..2e4fb03
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkClient;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and 
submitted to Flink for execution in the
+ * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink 
cluster can be local or remote.
+ * <p/>
+ * This example shows how to submit the program via Java, thus it cannot be 
used to submit a {@link StormTopology} via
+ * Flink command line clients (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCountRemoteByClient &lt;text path&gt; &lt;result 
path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
+ * </ul>
+ */
+public class WordCountRemoteByClient {
+       public final static String topologyId = "Storm WordCount";
+       private final static String uploadedJarLocation = 
"target/WordCount-StormTopology.jar";
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(final String[] args) throws 
AlreadyAliveException, InvalidTopologyException,
+       NotAliveException {
+
+               if (!WordCountTopology.parseParameters(args)) {
+                       return;
+               }
+
+               // build Topology the Storm way
+               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
+
+               // execute program on Flink cluster
+               final Config conf = new Config();
+               // can be changed to remote address
+               conf.put(Config.NIMBUS_HOST, "localhost");
+               // use default flink jobmanger.rpc.port
+               conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
+
+               final FlinkClient cluster = 
FlinkClient.getConfiguredClient(conf);
+               cluster.submitTopology(topologyId, uploadedJarLocation, 
builder.createTopology());
+
+               Utils.sleep(5 * 1000);
+
+               cluster.killTopology(topologyId);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
new file mode 100644
index 0000000..173074c
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkClient;
+import org.apache.flink.storm.api.FlinkSubmitter;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology} and 
submitted to Flink for execution in the
+ * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink 
cluster can be local or remote.
+ * <p/>
+ * This example shows how to submit the program via Java as well as Flink's 
command line client (ie, bin/flink).
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage: <code>WordCountRemoteBySubmitter &lt;text path&gt; &lt;result 
path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
+ * </ul>
+ */
+public class WordCountRemoteBySubmitter {
+       public final static String topologyId = "Storm WordCount";
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(final String[] args) throws Exception {
+
+               if (!WordCountTopology.parseParameters(args)) {
+                       return;
+               }
+
+               // build Topology the Storm way
+               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
+
+               // execute program on Flink cluster
+               final Config conf = new Config();
+               // We can set Jobmanager host/port values manually or leave 
them blank
+               // if not set and
+               // - executed within Java, default values "localhost" and 
"6123" are set by FlinkSubmitter
+               // - executed via bin/flink values from flink-conf.yaml are set 
by FlinkSubmitter.
+               // conf.put(Config.NIMBUS_HOST, "localhost");
+               // conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
+
+               // The user jar file must be specified via JVM argument if 
executed via Java.
+               // => -Dstorm.jar=target/WordCount-StormTopology.jar
+               // If bin/flink is used, the jar file is detected automatically.
+               FlinkSubmitter.submitTopology(topologyId, conf, 
builder.createTopology());
+
+               Thread.sleep(5 * 1000);
+
+               FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
new file mode 100644
index 0000000..8ee374d
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.util.OutputFormatter;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.BoltPrintSink;
+import org.apache.flink.storm.util.TupleOutputFormatter;
+import org.apache.flink.storm.wordcount.operators.BoltCounter;
+import org.apache.flink.storm.wordcount.operators.BoltCounterByName;
+import org.apache.flink.storm.wordcount.operators.BoltTokenizer;
+import org.apache.flink.storm.wordcount.operators.BoltTokenizerByName;
+import org.apache.flink.storm.wordcount.operators.WordCountFileSpout;
+import org.apache.flink.storm.wordcount.operators.WordCountInMemorySpout;
+
+/**
+ * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
+ * fashion. The program is constructed as a regular {@link StormTopology}.
+ * <p/>
+ * <p/>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p/>
+ * <p/>
+ * Usage:
+ * <code>WordCount[Local|LocalByName|RemoteByClient|RemoteBySubmitter] 
&lt;text path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
+ * <p/>
+ * <p/>
+ * This example shows how to:
+ * <ul>
+ * <li>how to construct a regular Storm topology as Flink program</li>
+ * </ul>
+ */
+public class WordCountTopology {
+       public final static String spoutId = "source";
+       public final static String tokenierzerId = "tokenizer";
+       public final static String counterId = "counter";
+       public final static String sinkId = "sink";
+       private final static OutputFormatter formatter = new 
TupleOutputFormatter();
+
+       public static FlinkTopologyBuilder buildTopology() {
+               return buildTopology(true);
+       }
+
+       public static FlinkTopologyBuilder buildTopology(boolean indexOrName) {
+
+               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+               // get input data
+               if (fileInputOutput) {
+                       // read the text file from given input path
+                       final String[] tokens = textPath.split(":");
+                       final String inputFile = tokens[tokens.length - 1];
+                       builder.setSpout(spoutId, new 
WordCountFileSpout(inputFile));
+               } else {
+                       builder.setSpout(spoutId, new WordCountInMemorySpout());
+               }
+
+               if (indexOrName) {
+                       // split up the lines in pairs (2-tuples) containing: 
(word,1)
+                       builder.setBolt(tokenierzerId, new BoltTokenizer(), 
4).shuffleGrouping(spoutId);
+                       // group by the tuple field "0" and sum up tuple field 
"1"
+                       builder.setBolt(counterId, new BoltCounter(), 
4).fieldsGrouping(tokenierzerId,
+                                       new 
Fields(BoltTokenizer.ATTRIBUTE_WORD));
+               } else {
+                       // split up the lines in pairs (2-tuples) containing: 
(word,1)
+                       builder.setBolt(tokenierzerId, new 
BoltTokenizerByName(), 4).shuffleGrouping(
+                                       spoutId);
+                       // group by the tuple field "0" and sum up tuple field 
"1"
+                       builder.setBolt(counterId, new BoltCounterByName(), 
4).fieldsGrouping(
+                                       tokenierzerId, new 
Fields(BoltTokenizerByName.ATTRIBUTE_WORD));
+               }
+
+               // emit result
+               if (fileInputOutput) {
+                       // read the text file from given input path
+                       final String[] tokens = outputPath.split(":");
+                       final String outputFile = tokens[tokens.length - 1];
+                       builder.setBolt(sinkId, new BoltFileSink(outputFile, 
formatter)).shuffleGrouping(counterId);
+               } else {
+                       builder.setBolt(sinkId, new BoltPrintSink(formatter), 
4).shuffleGrouping(counterId);
+               }
+
+               return builder;
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileInputOutput = false;
+       private static String textPath;
+       private static String outputPath;
+
+       static boolean parseParameters(final String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       fileInputOutput = true;
+                       if (args.length == 2) {
+                               textPath = args[0];
+                               outputPath = args[1];
+                       } else {
+                               System.err.println("Usage: WordCount* <text 
path> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing WordCount example with 
built-in default data");
+                       System.out.println("  Provide parameters to read input 
data from a file");
+                       System.out.println("  Usage: WordCount* <text path> 
<result path>");
+               }
+
+               return true;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
new file mode 100644
index 0000000..d21a584
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount.operators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements the word counter that counts the occurrence of each unique word. 
The bolt takes a pair (input tuple
+ * schema: {@code <String,Integer>}) and sums the given word count for each 
unique word (output tuple schema:
+ * {@code <String,Integer>} ).
+ * <p>
+ * Same as {@link BoltCounterByName}, but accesses input attribute by index 
(instead of name).
+ */
+public class BoltCounter implements IRichBolt {
+       private static final long serialVersionUID = 399619605462625934L;
+
+       public static final String ATTRIBUTE_WORD = "word";
+       public static final String ATTRIBUTE_COUNT = "count";
+
+       private final HashMap<String, Count> counts = new HashMap<String, 
Count>();
+       private OutputCollector collector;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
+               this.collector = collector;
+       }
+
+       @Override
+       public void execute(final Tuple input) {
+               final String word = 
input.getString(BoltTokenizer.ATTRIBUTE_WORD_INDEX);
+
+               Count currentCount = this.counts.get(word);
+               if (currentCount == null) {
+                       currentCount = new Count();
+                       this.counts.put(word, currentCount);
+               }
+               currentCount.count += 
input.getInteger(BoltTokenizer.ATTRIBUTE_COUNT_INDEX);
+
+               this.collector.emit(new Values(word, currentCount.count));
+       }
+
+       @Override
+       public void cleanup() {/* nothing to do */}
+
+       @Override
+       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+       /**
+        * A counter helper to emit immutable tuples to the given 
stormCollector and avoid unnecessary object
+        * creating/deletion.
+        */
+       private static final class Count {
+               public int count;
+
+               public Count() {/* nothing to do */}
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
new file mode 100644
index 0000000..d5c05d7
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltCounterByName.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount.operators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implements the word counter that counts the occurrence of each unique word. 
The bolt takes a pair (input tuple
+ * schema: {@code <String,Integer>}) and sums the given word count for each 
unique word (output tuple schema:
+ * {@code <String,Integer>} ).
+ * <p>
+ * Same as {@link BoltCounter}, but accesses input attribute by name (instead 
of index).
+ */
+public class BoltCounterByName implements IRichBolt {
+       private static final long serialVersionUID = 399619605462625934L;
+
+       public static final String ATTRIBUTE_WORD = "word";
+       public static final String ATTRIBUTE_COUNT = "count";
+
+       private final HashMap<String, Count> counts = new HashMap<String, 
Count>();
+       private OutputCollector collector;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
+               this.collector = collector;
+       }
+
+       @Override
+       public void execute(final Tuple input) {
+               final String word = 
input.getStringByField(BoltTokenizer.ATTRIBUTE_WORD);
+
+               Count currentCount = this.counts.get(word);
+               if (currentCount == null) {
+                       currentCount = new Count();
+                       this.counts.put(word, currentCount);
+               }
+               currentCount.count += 
input.getIntegerByField(BoltTokenizer.ATTRIBUTE_COUNT);
+
+               this.collector.emit(new Values(word, currentCount.count));
+       }
+
+       @Override
+       public void cleanup() {/* nothing to do */}
+
+       @Override
+       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+       /**
+        * A counter helper to emit immutable tuples to the given 
stormCollector and avoid unnecessary object
+        * creating/deletion.
+        */
+       private static final class Count {
+               public int count;
+
+               public Count() {/* nothing to do */}
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
new file mode 100644
index 0000000..74d6a99
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizer.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount.operators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ * Implements the string tokenizer that splits sentences into words as a bolt. 
The bolt takes a line (input tuple
+ * schema: {@code <String>}) and splits it into multiple pairs in the form of 
"(word,1)" (output tuple schema:
+ * {@code <String,Integer>}).
+ * <p>
+ * Same as {@link BoltTokenizerByName}, but accesses input attribute by index 
(instead of name).
+ */
+public final class BoltTokenizer implements IRichBolt {
+       private static final long serialVersionUID = -8589620297208175149L;
+
+       public static final String ATTRIBUTE_WORD = "word";
+       public static final String ATTRIBUTE_COUNT = "count";
+
+       public static final int ATTRIBUTE_WORD_INDEX = 0;
+       public static final int ATTRIBUTE_COUNT_INDEX = 1;
+
+       private OutputCollector collector;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
+               this.collector = collector;
+       }
+
+       @Override
+       public void execute(final Tuple input) {
+               final String[] tokens = 
input.getString(0).toLowerCase().split("\\W+");
+
+               for (final String token : tokens) {
+                       if (token.length() > 0) {
+                               this.collector.emit(new Values(token, 1));
+                       }
+               }
+       }
+
+       @Override
+       public void cleanup() {/* nothing to do */}
+
+       @Override
+       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
new file mode 100644
index 0000000..3c56b36
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/BoltTokenizerByName.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount.operators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+/**
+ * Implements the string tokenizer that splits sentences into words as a bolt. 
The bolt takes a line (input tuple
+ * schema: {@code <String>}) and splits it into multiple pairs in the form of 
"(word,1)" (output tuple schema:
+ * {@code <String,Integer>}).
+ * <p>
+ * Same as {@link BoltTokenizer}, but accesses input attribute by name 
(instead of index).
+ */
+public final class BoltTokenizerByName implements IRichBolt {
+       private static final long serialVersionUID = -8589620297208175149L;
+
+       public static final String ATTRIBUTE_WORD = "word";
+       public static final String ATTRIBUTE_COUNT = "count";
+
+       public static final int ATTRIBUTE_WORD_INDEX = 0;
+       public static final int ATTRIBUTE_COUNT_INDEX = 1;
+
+       private OutputCollector collector;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
+               this.collector = collector;
+       }
+
+       @Override
+       public void execute(final Tuple input) {
+               final String[] tokens = 
input.getStringByField("sentence").toLowerCase().split("\\W+");
+
+               for (final String token : tokens) {
+                       if (token.length() > 0) {
+                               this.collector.emit(new Values(token, 1));
+                       }
+               }
+       }
+
+       @Override
+       public void cleanup() {/* nothing to do */}
+
+       @Override
+       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
new file mode 100644
index 0000000..3a8fd3a
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataPojos.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount.operators;
+
+import java.io.Serializable;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+public class WordCountDataPojos {
+       public static Sentence[] SENTENCES;
+
+       static {
+               SENTENCES = new Sentence[WordCountData.WORDS.length];
+               for (int i = 0; i < SENTENCES.length; ++i) {
+                       SENTENCES[i] = new Sentence(WordCountData.WORDS[i]);
+               }
+       }
+
+       public static class Sentence implements Serializable {
+               private static final long serialVersionUID = 
-7336372859203407522L;
+
+               private String sentence;
+
+               public Sentence() {
+               }
+
+               public Sentence(String sentence) {
+                       this.sentence = sentence;
+               }
+
+               public String getSentence() {
+                       return sentence;
+               }
+
+               public void setSentence(String sentence) {
+                       this.sentence = sentence;
+               }
+
+               @Override
+               public String toString() {
+                       return "(" + this.sentence + ")";
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java
new file mode 100644
index 0000000..16e2ba0
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountDataTuple.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount.operators;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+
+@SuppressWarnings("unchecked")
+public class WordCountDataTuple {
+       public static Tuple1<String>[] TUPLES;
+
+       static {
+               TUPLES = new Tuple1[WordCountData.WORDS.length];
+               for (int i = 0; i < TUPLES.length; ++i) {
+                       TUPLES[i] = new Tuple1<String>(WordCountData.WORDS[i]);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
new file mode 100644
index 0000000..76a198f
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountFileSpout.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount.operators;
+
+import org.apache.flink.storm.util.FileSpout;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Implements a Spout that reads data from a given local file.
+ */
+public final class WordCountFileSpout extends FileSpout {
+       private static final long serialVersionUID = 2372251989250954503L;
+
+       public WordCountFileSpout(String path) {
+               super(path);
+       }
+
+       @Override
+       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields("sentence"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
new file mode 100644
index 0000000..eb96160
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount.operators;
+
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.util.InMemorySpout;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+
+/**
+ * Implements a Spout that reads data from {@link WordCountData#WORDS}.
+ */
+public final class WordCountInMemorySpout extends InMemorySpout<String> {
+       private static final long serialVersionUID = 8832143302409465843L;
+
+       public WordCountInMemorySpout() {
+               super(WordCountData.WORDS);
+       }
+
+       @Override
+       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields("sentence"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
new file mode 100644
index 0000000..781396c
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation;
+
+import org.apache.flink.storm.excamation.ExclamationWithBolt;
+import org.apache.flink.storm.exclamation.util.ExclamationData;
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class ExclamationWithBoltITCase extends StormTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+       protected String exclamationNum;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
+               this.resultPath = this.getTempDirPath("result");
+               this.exclamationNum = "3";
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
this.resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               ExclamationWithBolt.main(new String[]{this.textPath, 
this.resultPath, this.exclamationNum});
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
new file mode 100644
index 0000000..36b8aed
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation;
+
+import org.apache.flink.storm.excamation.ExclamationWithSpout;
+import org.apache.flink.storm.exclamation.util.ExclamationData;
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class ExclamationWithSpoutITCase extends StormTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
+               this.resultPath = this.getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
this.resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               ExclamationWithSpout.main(new String[]{this.textPath, 
this.resultPath});
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
new file mode 100644
index 0000000..cec276f
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation;
+
+import org.apache.flink.storm.excamation.ExclamationLocal;
+import org.apache.flink.storm.exclamation.util.ExclamationData;
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class StormExclamationLocalITCase extends StormTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+       protected String exclamationNum;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
+               this.resultPath = this.getTempDirPath("result");
+               this.exclamationNum = "3";
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(ExclamationData.TEXT_WITH_EXCLAMATIONS, 
this.resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               ExclamationLocal.main(new String[]{this.textPath, 
this.resultPath, this.exclamationNum});
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java
new file mode 100644
index 0000000..3c435f9
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/util/ExclamationData.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation.util;
+
+public class ExclamationData {
+
+       public static final String TEXT_WITH_EXCLAMATIONS =
+                       "Goethe - Faust: Der Tragoedie erster Teil!!!!!!\n"
+                                       + "Prolog im Himmel.!!!!!!\n"
+                                       + "Der Herr. Die himmlischen 
Heerscharen. Nachher Mephistopheles. Die drei!!!!!!\n"
+                                       + "Erzengel treten vor.!!!!!!\n"
+                                       + "RAPHAEL: Die Sonne toent, nach alter 
Weise, In Brudersphaeren Wettgesang,!!!!!!\n"
+                                       + "Und ihre vorgeschriebne Reise 
Vollendet sie mit Donnergang. Ihr Anblick!!!!!!\n"
+                                       + "gibt den Engeln Staerke, Wenn keiner 
Sie ergruenden mag; die unbegreiflich!!!!!!\n"
+                                       + "hohen Werke Sind herrlich wie am 
ersten Tag.!!!!!!\n"
+                                       + "GABRIEL: Und schnell und 
unbegreiflich schnelle Dreht sich umher der Erde!!!!!!\n"
+                                       + "Pracht; Es wechselt Paradieseshelle 
Mit tiefer, schauervoller Nacht. Es!!!!!!\n"
+                                       + "schaeumt das Meer in breiten 
Fluessen Am tiefen Grund der Felsen auf, Und!!!!!!\n"
+                                       + "Fels und Meer wird fortgerissen Im 
ewig schnellem Sphaerenlauf.!!!!!!\n"
+                                       + "MICHAEL: Und Stuerme brausen um die 
Wette Vom Meer aufs Land, vom Land!!!!!!\n"
+                                       + "aufs Meer, und bilden wuetend eine 
Kette Der tiefsten Wirkung rings umher.!!!!!!\n"
+                                       + "Da flammt ein blitzendes Verheeren 
Dem Pfade vor des Donnerschlags. Doch!!!!!!\n"
+                                       + "deine Boten, Herr, verehren Das 
sanfte Wandeln deines Tags.!!!!!!\n"
+                                       + "ZU DREI: Der Anblick gibt den Engeln 
Staerke, Da keiner dich ergruenden!!!!!!\n"
+                                       + "mag, Und alle deine hohen Werke Sind 
herrlich wie am ersten Tag.!!!!!!\n"
+                                       + "MEPHISTOPHELES: Da du, o Herr, dich 
einmal wieder nahst Und fragst, wie!!!!!!\n"
+                                       + "alles sich bei uns befinde, Und du 
mich sonst gewoehnlich gerne sahst, So!!!!!!\n"
+                                       + "siehst du mich auch unter dem 
Gesinde. Verzeih, ich kann nicht hohe Worte!!!!!!\n"
+                                       + "machen, Und wenn mich auch der ganze 
Kreis verhoehnt; Mein Pathos braechte!!!!!!\n"
+                                       + "dich gewiss zum Lachen, Haettst du 
dir nicht das Lachen abgewoehnt. Von!!!!!!\n"
+                                       + "Sonn' und Welten weiss ich nichts zu 
sagen, Ich sehe nur, wie sich die!!!!!!\n"
+                                       + "Menschen plagen. Der kleine Gott der 
Welt bleibt stets von gleichem!!!!!!\n"
+                                       + "Schlag, Und ist so wunderlich als 
wie am ersten Tag. Ein wenig besser!!!!!!\n"
+                                       + "wuerd er leben, Haettst du ihm nicht 
den Schein des Himmelslichts gegeben;!!!!!!\n"
+                                       + "Er nennt's Vernunft und braucht's 
allein, Nur tierischer als jedes Tier!!!!!!\n"
+                                       + "zu sein. Er scheint mir, mit Verlaub 
von euer Gnaden, Wie eine der!!!!!!\n"
+                                       + "langbeinigen Zikaden, Die immer 
fliegt und fliegend springt Und gleich im!!!!!!\n"
+                                       + "Gras ihr altes Liedchen singt; Und 
laeg er nur noch immer in dem Grase! In!!!!!!\n"
+                                       + "jeden Quark begraebt er seine 
Nase.!!!!!!\n"
+                                       + "DER HERR: Hast du mir weiter nichts 
zu sagen? Kommst du nur immer!!!!!!\n"
+                                       + "anzuklagen? Ist auf der Erde ewig 
dir nichts recht?!!!!!!\n"
+                                       + "MEPHISTOPHELES: Nein Herr! ich find 
es dort, wie immer, herzlich!!!!!!\n"
+                                       + "schlecht. Die Menschen dauern mich 
in ihren Jammertagen, Ich mag sogar!!!!!!\n"
+                                       + "die armen selbst nicht 
plagen.!!!!!!\n" + "DER HERR: Kennst du den Faust?!!!!!!\n"
+                                       + "MEPHISTOPHELES: Den Doktor?!!!!!!\n"
+                                       + "DER HERR: Meinen Knecht!!!!!!!\n"
+                                       + "MEPHISTOPHELES: Fuerwahr! er dient 
Euch auf besondre Weise. Nicht irdisch!!!!!!\n"
+                                       + "ist des Toren Trank noch Speise. Ihn 
treibt die Gaerung in die Ferne, Er!!!!!!\n"
+                                       + "ist sich seiner Tollheit halb 
bewusst; Vom Himmel fordert er die schoensten!!!!!!\n"
+                                       + "Sterne Und von der Erde jede 
hoechste Lust, Und alle Naeh und alle Ferne!!!!!!\n"
+                                       + "Befriedigt nicht die tiefbewegte 
Brust.!!!!!!\n"
+                                       + "DER HERR: Wenn er mir auch nur 
verworren dient, So werd ich ihn bald in!!!!!!\n"
+                                       + "die Klarheit fuehren. Weiss doch der 
Gaertner, wenn das Baeumchen gruent, Das!!!!!!\n"
+                                       + "Bluet und Frucht die kuenft'gen 
Jahre zieren.!!!!!!\n"
+                                       + "MEPHISTOPHELES: Was wettet Ihr? den 
sollt Ihr noch verlieren! Wenn Ihr!!!!!!\n"
+                                       + "mir die Erlaubnis gebt, Ihn meine 
Strasse sacht zu fuehren.!!!!!!\n"
+                                       + "DER HERR: Solang er auf der Erde 
lebt, So lange sei dir's nicht verboten,!!!!!!\n"
+                                       + "Es irrt der Mensch so lang er 
strebt.!!!!!!\n"
+                                       + "MEPHISTOPHELES: Da dank ich Euch; 
denn mit den Toten Hab ich mich niemals!!!!!!\n"
+                                       + "gern befangen. Am meisten lieb ich 
mir die vollen, frischen Wangen. Fuer!!!!!!\n"
+                                       + "einem Leichnam bin ich nicht zu 
Haus; Mir geht es wie der Katze mit der Maus.!!!!!!\n"
+                                       + "DER HERR: Nun gut, es sei dir 
ueberlassen! Zieh diesen Geist von seinem!!!!!!\n"
+                                       + "Urquell ab, Und fuehr ihn, kannst du 
ihn erfassen, Auf deinem Wege mit!!!!!!\n"
+                                       + "herab, Und steh beschaemt, wenn du 
bekennen musst: Ein guter Mensch, in!!!!!!\n"
+                                       + "seinem dunklen Drange, Ist sich des 
rechten Weges wohl bewusst.!!!!!!\n"
+                                       + "MEPHISTOPHELES: Schon gut! nur 
dauert es nicht lange. Mir ist fuer meine!!!!!!\n"
+                                       + "Wette gar nicht bange. Wenn ich zu 
meinem Zweck gelange, Erlaubt Ihr mir!!!!!!\n"
+                                       + "Triumph aus voller Brust. Staub soll 
er fressen, und mit Lust, Wie meine!!!!!!\n"
+                                       + "Muhme, die beruehmte 
Schlange.!!!!!!\n"
+                                       + "DER HERR: Du darfst auch da nur frei 
erscheinen; Ich habe deinesgleichen!!!!!!\n"
+                                       + "nie gehasst. Von allen Geistern, die 
verneinen, ist mir der Schalk am!!!!!!\n"
+                                       + "wenigsten zur Last. Des Menschen 
Taetigkeit kann allzu leicht erschlaffen,!!!!!!\n"
+                                       + "er liebt sich bald die unbedingte 
Ruh; Drum geb ich gern ihm den Gesellen!!!!!!\n"
+                                       + "zu, Der reizt und wirkt und muss als 
Teufel schaffen. Doch ihr, die echten!!!!!!\n"
+                                       + "Goettersoehne, Erfreut euch der 
lebendig reichen Schoene! Das Werdende, das!!!!!!\n"
+                                       + "ewig wirkt und lebt, Umfass euch mit 
der Liebe holden Schranken, Und was!!!!!!\n"
+                                       + "in schwankender Erscheinung schwebt, 
Befestigt mit dauernden Gedanken!!!!!!!\n"
+                                       + "(Der Himmel schliesst, die Erzengel 
verteilen sich.)!!!!!!\n"
+                                       + "MEPHISTOPHELES (allein): Von Zeit zu 
Zeit seh ich den Alten gern, Und!!!!!!\n"
+                                       + "huete mich, mit ihm zu brechen. Es 
ist gar huebsch von einem grossen Herrn,!!!!!!\n"
+                                       + "So menschlich mit dem Teufel selbst 
zu sprechen.!!!!!!";
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/BoltSplitITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/BoltSplitITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/BoltSplitITCase.java
new file mode 100644
index 0000000..dc174e7
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/BoltSplitITCase.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import org.junit.Test;
+
+public class BoltSplitITCase {
+
+       @Test
+       public void testTopology() throws Exception {
+               SplitStreamBoltLocal.main(new String[] { "0", "/dev/null" });
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
new file mode 100644
index 0000000..c7b9c1d
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBolt.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class SplitBolt extends BaseRichBolt {
+       private static final long serialVersionUID = -6627606934204267173L;
+
+       public static final String EVEN_STREAM = "even";
+       public static final String ODD_STREAM = "odd";
+
+       private OutputCollector collector;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+               this.collector = collector;
+
+       }
+
+       @Override
+       public void execute(Tuple input) {
+               if (input.getInteger(0) % 2 == 0) {
+                       this.collector.emit(EVEN_STREAM, new 
Values(input.getInteger(0)));
+               } else {
+                       this.collector.emit(ODD_STREAM, new 
Values(input.getInteger(0)));
+               }
+       }
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+               Fields schema = new Fields("number");
+               declarer.declareStream(EVEN_STREAM, schema);
+               declarer.declareStream(ODD_STREAM, schema);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
new file mode 100644
index 0000000..d0973cb
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.split.operators.RandomSpout;
+import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
+import org.apache.flink.storm.util.OutputFormatter;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.BoltPrintSink;
+import org.apache.flink.storm.util.TupleOutputFormatter;
+
+public class SplitBoltTopology {
+       public final static String spoutId = "randomSource";
+       public final static String boltId = "splitBolt";
+       public final static String evenVerifierId = "evenVerifier";
+       public final static String oddVerifierId = "oddVerifier";
+       public final static String sinkId = "sink";
+       private final static OutputFormatter formatter = new 
TupleOutputFormatter();
+
+       public static FlinkTopologyBuilder buildTopology() {
+               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+               builder.setSpout(spoutId, new RandomSpout(false, seed));
+               builder.setBolt(boltId, new 
SplitBolt()).shuffleGrouping(spoutId);
+               builder.setBolt(evenVerifierId, new 
VerifyAndEnrichBolt(true)).shuffleGrouping(boltId,
+                               SplitBolt.EVEN_STREAM);
+               builder.setBolt(oddVerifierId, new 
VerifyAndEnrichBolt(false)).shuffleGrouping(boltId,
+                               SplitBolt.ODD_STREAM);
+
+               // emit result
+               if (outputPath != null) {
+                       // read the text file from given input path
+                       final String[] tokens = outputPath.split(":");
+                       final String outputFile = tokens[tokens.length - 1];
+                       builder.setBolt(sinkId, new BoltFileSink(outputFile, 
formatter))
+                       
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+               } else {
+                       builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
+                       
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+               }
+
+               return builder;
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static long seed = System.currentTimeMillis();
+       private static String outputPath = null;
+
+       static boolean parseParameters(final String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       if (args.length == 2) {
+                               seed = Long.parseLong(args[0]);
+                               outputPath = args[1];
+                       } else {
+                               System.err.println("Usage: SplitStreamBoltLocal 
<seed> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing SplitBoltTopology example 
with random data");
+                       System.out.println("  Usage: SplitStreamBoltLocal 
<seed> <result path>");
+               }
+
+               return true;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
new file mode 100644
index 0000000..2b3b6a8
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitSpoutTopology.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.split.operators.RandomSpout;
+import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
+import org.apache.flink.storm.util.OutputFormatter;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.BoltPrintSink;
+import org.apache.flink.storm.util.TupleOutputFormatter;
+
+public class SplitSpoutTopology {
+       public final static String spoutId = "randomSplitSource";
+       public final static String evenVerifierId = "evenVerifier";
+       public final static String oddVerifierId = "oddVerifier";
+       public final static String sinkId = "sink";
+       private final static OutputFormatter formatter = new 
TupleOutputFormatter();
+
+       public static FlinkTopologyBuilder buildTopology() {
+               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+
+               builder.setSpout(spoutId, new RandomSpout(true, seed));
+               builder.setBolt(evenVerifierId, new 
VerifyAndEnrichBolt(true)).shuffleGrouping(spoutId,
+                               RandomSpout.EVEN_STREAM);
+               builder.setBolt(oddVerifierId, new 
VerifyAndEnrichBolt(false)).shuffleGrouping(spoutId,
+                               RandomSpout.ODD_STREAM);
+
+               // emit result
+               if (outputPath != null) {
+                       // read the text file from given input path
+                       final String[] tokens = outputPath.split(":");
+                       final String outputFile = tokens[tokens.length - 1];
+                       builder.setBolt(sinkId, new BoltFileSink(outputFile, 
formatter))
+                       
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+               } else {
+                       builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
+                       
.shuffleGrouping(evenVerifierId).shuffleGrouping(oddVerifierId);
+               }
+
+               return builder;
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static long seed = System.currentTimeMillis();
+       private static String outputPath = null;
+
+       static boolean parseParameters(final String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       if (args.length == 2) {
+                               seed = Long.parseLong(args[0]);
+                               outputPath = args[1];
+                       } else {
+                               System.err.println("Usage: 
SplitStreamSpoutLocal <seed> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing SplitSpoutTopology 
example with random data");
+                       System.out.println("  Usage: SplitStreamSpoutLocal 
<seed> <result path>");
+               }
+
+               return true;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
new file mode 100644
index 0000000..e2c22f9
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamBoltLocal.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
+import backtype.storm.utils.Utils;
+
+public class SplitStreamBoltLocal {
+       public final static String topologyId = "Bolt split stream example";
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(final String[] args) throws Exception {
+
+               if (!SplitBoltTopology.parseParameters(args)) {
+                       return;
+               }
+
+               // build Topology the Storm way
+               final FlinkTopologyBuilder builder = 
SplitBoltTopology.buildTopology();
+
+               // execute program locally
+               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
+               cluster.submitTopology(topologyId, null, 
builder.createTopology());
+
+               Utils.sleep(5 * 1000);
+
+               // TODO kill does no do anything so far
+               cluster.killTopology(topologyId);
+               cluster.shutdown();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
new file mode 100644
index 0000000..2070f66
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitStreamSpoutLocal.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+
+import backtype.storm.utils.Utils;
+
+public class SplitStreamSpoutLocal {
+       public final static String topologyId = "Spout split stream example";
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(final String[] args) throws Exception {
+
+               if (!SplitSpoutTopology.parseParameters(args)) {
+                       return;
+               }
+
+               // build Topology the Storm way
+               final FlinkTopologyBuilder builder = 
SplitSpoutTopology.buildTopology();
+
+               // execute program locally
+               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
+               cluster.submitTopology(topologyId, null, 
builder.createTopology());
+
+               Utils.sleep(5 * 1000);
+
+               // TODO kill does no do anything so far
+               cluster.killTopology(topologyId);
+               cluster.shutdown();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SpoutSplitITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SpoutSplitITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SpoutSplitITCase.java
new file mode 100644
index 0000000..8e0fda9
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SpoutSplitITCase.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.split;
+
+import org.junit.Test;
+
+public class SpoutSplitITCase {
+
+       @Test
+       public void testTopology() throws Exception {
+               SplitStreamSpoutLocal.main(new String[] { "0", "/dev/null" });
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java
new file mode 100644
index 0000000..32dac7b
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/util/StormTestBase.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class for Storm tests.
+ */
+public abstract class StormTestBase extends AbstractTestBase {
+       
+       public static final int DEFAULT_PARALLELISM = 4;
+       
+       public StormTestBase() {
+               this(new Configuration());
+       }
+       
+       public StormTestBase(Configuration config) {
+               super(config, StreamingMode.STREAMING);
+               setTaskManagerNumSlots(DEFAULT_PARALLELISM);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Methods to create the test program and for pre- and post- test work
+       // 
------------------------------------------------------------------------
+
+       protected abstract void testProgram() throws Exception;
+
+       protected void preSubmit() throws Exception {}
+
+       protected void postSubmit() throws Exception {}
+
+       // 
------------------------------------------------------------------------
+       //  Test entry point
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testJob() throws Exception {
+               try {
+                       // pre-submit
+                       try {
+                               preSubmit();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               fail("Pre-submit work caused an error: " + 
e.getMessage());
+                       }
+
+                       // prepare the test environment
+                       startCluster();
+
+                       // we need to initialize the stream test environment, 
and the storm local cluster
+                       TestStreamEnvironment.setAsContext(this.executor, 
DEFAULT_PARALLELISM);
+                       
+                       FlinkLocalCluster.initialize(new 
FlinkLocalCluster.LocalClusterFactory() {
+                               @Override
+                               public FlinkLocalCluster createLocalCluster() {
+                                       return new FlinkLocalCluster(executor);
+                               }
+                       });
+
+                       // call the test program
+                       try {
+                               testProgram();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               fail("Error while calling the test program: " + 
e.getMessage());
+                       }
+
+                       // post-submit
+                       try {
+                               postSubmit();
+                       }
+                       catch (Exception e) {
+                               System.err.println(e.getMessage());
+                               e.printStackTrace();
+                               fail("Post-submit work caused an error: " + 
e.getMessage());
+                       }
+               }
+               finally {
+                       // reset the FlinkLocalCluster to its default behavior
+                       FlinkLocalCluster.initialize(new 
FlinkLocalCluster.DefaultLocalClusterFactory());
+                       
+                       // reset the StreamExecutionEnvironment to its default 
behavior
+                       TestStreamEnvironment.unsetAsContext();
+                       
+                       // clean up all resources
+                       stopCluster();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
new file mode 100644
index 0000000..62d23ab
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.storm.wordcount.BoltTokenizerWordCount;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class BoltTokenizerWordCountITCase extends StormTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
+               this.resultPath = this.getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               BoltTokenizerWordCount.main(new String[]{this.textPath, 
this.resultPath});
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
new file mode 100644
index 0000000..009bdc2
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojoITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.storm.wordcount.BoltTokenizerWordCountPojo;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class BoltTokenizerWordCountPojoITCase extends StormTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
+               this.resultPath = this.getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               BoltTokenizerWordCountPojo.main(new String[]{this.textPath, 
this.resultPath});
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
new file mode 100644
index 0000000..321015b
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNamesITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.storm.wordcount.BoltTokenizerWordCountWithNames;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class BoltTokenizerWordCountWithNamesITCase extends StormTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
+               this.resultPath = this.getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               BoltTokenizerWordCountWithNames.main(new 
String[]{this.textPath, this.resultPath});
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
new file mode 100644
index 0000000..0cff211
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/SpoutSourceWordCountITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.storm.wordcount.SpoutSourceWordCount;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class SpoutSourceWordCountITCase extends StormTestBase {
+
+       protected String textPath;
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               this.textPath = this.createTempFile("text.txt", 
WordCountData.TEXT);
+               this.resultPath = this.getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, 
this.resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               SpoutSourceWordCount.main(new String[]{this.textPath, 
this.resultPath});
+       }
+
+}

Reply via email to