[FLINK-2837][storm] various improvements for the compatibility layer

- refactor to use Storm's topology builder
- remove FlinkTopologyBuilder
- instantiate context-based StreamExecutionEnvironment (local or remote)
- remove some of the Flink and Storm behavior replicating classes
- modify FlinkTopology to parse Storm topology directly
- replace StormTestBase with StreamingTestBase
- add print example
- FlinkTopologyBuilder changes (check if all inputs are available before 
processing)
- correct package typo
- two input support
- add join example
- update docs

This closes #1398.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/88636799
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/88636799
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/88636799

Branch: refs/heads/master
Commit: 88636799ee68cae80b093436ff2e5eace0675b95
Parents: 20fe2af
Author: Maximilian Michels <m...@apache.org>
Authored: Thu Nov 12 14:39:45 2015 +0100
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Dec 2 10:39:20 2015 +0100

----------------------------------------------------------------------
 docs/apis/storm_compatibility.md                |  12 +-
 flink-contrib/flink-storm-examples/pom.xml      |  10 +-
 .../storm/excamation/ExclamationLocal.java      |  72 ---
 .../storm/excamation/ExclamationTopology.java   | 120 -----
 .../storm/excamation/ExclamationWithBolt.java   | 141 ------
 .../storm/excamation/ExclamationWithSpout.java  | 147 ------
 .../excamation/operators/ExclamationBolt.java   |  75 ---
 .../storm/exclamation/ExclamationLocal.java     |  74 +++
 .../storm/exclamation/ExclamationTopology.java  | 120 +++++
 .../storm/exclamation/ExclamationWithBolt.java  | 140 ++++++
 .../storm/exclamation/ExclamationWithSpout.java | 146 ++++++
 .../exclamation/operators/ExclamationBolt.java  |  75 +++
 .../flink/storm/join/SingleJoinExample.java     |  88 ++++
 .../flink/storm/print/PrintSampleStream.java    |  61 +++
 .../flink/storm/util/FiniteFileSpout.java       |   2 -
 .../flink/storm/util/FiniteInMemorySpout.java   |   3 -
 .../storm/wordcount/BoltTokenizerWordCount.java |   1 -
 .../wordcount/BoltTokenizerWordCountPojo.java   |   1 -
 .../BoltTokenizerWordCountWithNames.java        |   1 -
 .../flink/storm/wordcount/WordCountLocal.java   |  13 +-
 .../storm/wordcount/WordCountLocalByName.java   |  13 +-
 .../wordcount/WordCountRemoteByClient.java      |   7 +-
 .../wordcount/WordCountRemoteBySubmitter.java   |   7 +-
 .../storm/wordcount/WordCountTopology.java      |  11 +-
 .../operators/WordCountInMemorySpout.java       |   7 +-
 .../exclamation/ExclamationWithBoltITCase.java  |   5 +-
 .../exclamation/ExclamationWithSpoutITCase.java |   5 +-
 .../StormExclamationLocalITCase.java            |   5 +-
 .../flink/storm/join/SingleJoinITCase.java      |  58 +++
 .../flink/storm/split/SplitBoltTopology.java    |   8 +-
 .../flink/storm/split/SplitSpoutTopology.java   |   8 +-
 .../flink/storm/split/SplitStreamBoltLocal.java |  16 +-
 .../storm/split/SplitStreamSpoutLocal.java      |  16 +-
 .../storm/tests/StormFieldsGroupingITCase.java  |  11 +-
 .../flink/storm/tests/operators/TaskIdBolt.java |   3 +
 .../apache/flink/storm/util/StormTestBase.java  | 117 -----
 .../wordcount/BoltTokenizerWordCountITCase.java |   5 +-
 .../BoltTokenizerWordCountPojoITCase.java       |   5 +-
 .../BoltTokenizerWordCountWithNamesITCase.java  |   5 +-
 .../wordcount/SpoutSourceWordCountITCase.java   |   5 +-
 .../storm/wordcount/WordCountLocalITCase.java   |   5 +-
 .../wordcount/WordCountLocalNamedITCase.java    |   6 +-
 .../src/test/resources/log4j-test.properties    |   2 +-
 .../org/apache/flink/storm/api/FlinkClient.java |   4 +-
 .../flink/storm/api/FlinkLocalCluster.java      |  23 +-
 .../storm/api/FlinkOutputFieldsDeclarer.java    |   1 -
 .../apache/flink/storm/api/FlinkTopology.java   | 483 +++++++++++++++++--
 .../flink/storm/api/FlinkTopologyBuilder.java   | 421 ----------------
 .../apache/flink/storm/util/StormConfig.java    |   7 +-
 .../flink/storm/util/StormStreamSelector.java   |   8 +-
 .../flink/storm/wrappers/BoltWrapper.java       |  89 ++--
 .../storm/wrappers/BoltWrapperTwoInput.java     | 134 +++++
 .../storm/wrappers/FlinkTopologyContext.java    |   3 +-
 .../wrappers/SetupOutputFieldsDeclarer.java     |   4 +-
 .../flink/storm/wrappers/SpoutCollector.java    |   3 +-
 .../flink/storm/wrappers/SpoutWrapper.java      |   8 +-
 .../apache/flink/storm/wrappers/StormTuple.java |  54 ++-
 .../storm/wrappers/WrapperSetupHelper.java      |  23 +-
 .../api/FlinkOutputFieldsDeclarerTest.java      |   2 -
 .../storm/api/FlinkTopologyBuilderTest.java     |  81 ----
 .../flink/storm/api/FlinkTopologyTest.java      |  73 ++-
 .../org/apache/flink/storm/api/TestBolt.java    |   4 +-
 .../org/apache/flink/storm/api/TestSpout.java   |   4 +-
 .../flink/storm/api/TestTopologyBuilder.java    |  29 --
 .../storm/util/StormStreamSelectorTest.java     |   6 +-
 .../apache/flink/storm/util/TestDummyBolt.java  |   4 +-
 .../apache/flink/storm/util/TestDummySpout.java |   4 +-
 .../org/apache/flink/storm/util/TestSink.java   |   8 +-
 .../flink/storm/wrappers/BoltCollectorTest.java |   2 -
 .../flink/storm/wrappers/BoltWrapperTest.java   |  11 +-
 .../wrappers/FlinkTopologyContextTest.java      |   6 +-
 .../wrappers/SetupOutputFieldsDeclarerTest.java |   2 -
 .../storm/wrappers/SpoutCollectorTest.java      |   2 -
 .../flink/storm/wrappers/SpoutWrapperTest.java  |   1 -
 .../flink/storm/wrappers/StormTupleTest.java    |  37 +-
 .../storm/wrappers/WrapperSetupHelperTest.java  |  36 +-
 76 files changed, 1666 insertions(+), 1543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
index 103b605..fe6bf35 100644
--- a/docs/apis/storm_compatibility.md
+++ b/docs/apis/storm_compatibility.md
@@ -57,20 +57,18 @@ See *WordCount Storm* within `flink-storm-examples/pom.xml` 
for an example how t
 
 Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that 
offers replacements for the following classes:
 
-- `TopologyBuilder` replaced by `FlinkTopologyBuilder`
 - `StormSubmitter` replaced by `FlinkSubmitter`
 - `NimbusClient` and `Client` replaced by `FlinkClient`
 - `LocalCluster` replaced by `FlinkLocalCluster`
 
 In order to submit a Storm topology to Flink, it is sufficient to replace the 
used Storm classes with their Flink replacements in the Storm *client code that 
assembles* the topology.
-The actual runtime code, ie, Spouts and Bolts, can be uses *unmodified*.
-If a topology is executed in a remote cluster, parameters `nimbus.host` and 
`nimbus.thrift.port` are used as `jobmanger.rpc.address` and 
`jobmanger.rpc.port`, respectively.
-If a parameter is not specified, the value is taken from `flink-conf.yaml`.
+The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
+If a topology is executed in a remote cluster, parameters `nimbus.host` and 
`nimbus.thrift.port` are used as `jobmanger.rpc.address` and 
`jobmanger.rpc.port`, respectively.  If a parameter is not specified, the value 
is taken from `flink-conf.yaml`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 ~~~java
-FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: 
TopologyBuilder builder = new FlinkTopology();
+TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
 
 // actual topology assembling code and used Spouts/Bolts can be used as-is
 builder.setSpout("source", new FileSpout(inputFilePath));
@@ -81,12 +79,12 @@ builder.setBolt("sink", new 
BoltFileSink(outputFilePath)).shuffleGrouping("count
 Config conf = new Config();
 if(runLocal) { // submit to test cluster
        FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: 
LocalCluster cluster = new LocalCluster();
-       cluster.submitTopology("WordCount", conf, builder.createTopology());
+       cluster.submitTopology("WordCount", conf, 
FlinkTopology.createTopology(builder));
 } else { // submit to remote cluster
        // optional
        // conf.put(Config.NIMBUS_HOST, "remoteHost");
        // conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
-       FlinkSubmitter.submitTopology("WordCount", conf, 
builder.createTopology()); // replaces: 
StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
+       FlinkSubmitter.submitTopology("WordCount", conf, 
FlinkTopology.createTopology(builder)); // replaces: 
StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
 }
 ~~~
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/pom.xml 
b/flink-contrib/flink-storm-examples/pom.xml
index cee6cac..6f3a050 100644
--- a/flink-contrib/flink-storm-examples/pom.xml
+++ b/flink-contrib/flink-storm-examples/pom.xml
@@ -61,6 +61,14 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.storm</groupId>
+                       <artifactId>storm-starter</artifactId>
+                       <version>0.9.4</version>
+               </dependency>
+
+
        </dependencies>
 
        <build>
@@ -226,7 +234,7 @@ under the License.
                                        </execution>
 
                                        <!-- WordCount Storm topology-->
-                                       <!-- Example for whole topologies (ie, 
if FlinkTopologyBuilder is used) -->
+                                       <!-- Example for whole topologies (ie, 
if FlinkTopology is used) -->
                                        <!-- We cannot use maven-jar-plugin 
because 'defaults.yaml' must be included in jar.
                                             However, we excluded 
'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
                                             Thus, 'defaults.yaml' is not 
available for maven-jar-plugin.

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
deleted file mode 100644
index 56a0125..0000000
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationLocal.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.excamation;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
-import org.apache.flink.storm.excamation.operators.ExclamationBolt;
-
-/**
- * Implements the "Exclamation" program that attaches five exclamation mark to 
every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link 
backtype.storm.generated.StormTopology} and submitted to
- * Flink for execution in the same way as to a Storm {@link 
backtype.storm.LocalCluster}.
- * <p>
- * This example shows how to run program directly within Java, thus it cannot 
be used to submit a
- * {@link backtype.storm.generated.StormTopology} via Flink command line 
clients (ie, bin/flink).
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>ExclamationLocal &lt;text path&gt; &lt;result 
path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
- * <p>
- * This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink</li>
- * </ul>
- */
-public class ExclamationLocal {
-
-       public final static String topologyId = "Streaming Exclamation";
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!ExclamationTopology.parseParameters(args)) {
-                       return;
-               }
-
-               // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
ExclamationTopology.buildTopology();
-
-               // execute program locally
-               Config conf = new Config();
-               conf.put(ExclamationBolt.EXCLAMATION_COUNT, 
ExclamationTopology.getExclamation());
-               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               cluster.submitTopology(topologyId, conf, 
builder.createTopology());
-
-               Utils.sleep(10 * 1000);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
deleted file mode 100644
index 9d94f5c..0000000
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationTopology.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.excamation;
-
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
-import org.apache.flink.storm.excamation.operators.ExclamationBolt;
-import org.apache.flink.storm.util.FiniteFileSpout;
-import org.apache.flink.storm.util.FiniteInMemorySpout;
-import org.apache.flink.storm.util.OutputFormatter;
-import org.apache.flink.storm.util.SimpleOutputFormatter;
-import org.apache.flink.storm.util.BoltFileSink;
-import org.apache.flink.storm.util.BoltPrintSink;
-
-/**
- * Implements the "Exclamation" program that attaches two exclamation marks to 
every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link 
backtype.storm.generated.StormTopology}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] &lt;text 
path&gt;
- * &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p>
- * This example shows how to:
- * <ul>
- * <li>construct a regular Storm topology as Flink program</li>
- * <li>make use of the FiniteSpout interface</li>
- * </ul>
- */
-public class ExclamationTopology {
-
-       public final static String spoutId = "source";
-       public final static String firstBoltId = "exclamation1";
-       public final static String secondBoltId = "exclamation2";
-       public final static String sinkId = "sink";
-       private final static OutputFormatter formatter = new 
SimpleOutputFormatter();
-
-       public static FlinkTopologyBuilder buildTopology() {
-               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
-               // get input data
-               if (fileInputOutput) {
-                       // read the text file from given input path
-                       final String[] tokens = textPath.split(":");
-                       final String inputFile = tokens[tokens.length - 1];
-                       builder.setSpout(spoutId, new 
FiniteFileSpout(inputFile));
-               } else {
-                       builder.setSpout(spoutId, new 
FiniteInMemorySpout(WordCountData.WORDS));
-               }
-
-               builder.setBolt(firstBoltId, new ExclamationBolt(), 
3).shuffleGrouping(spoutId);
-               builder.setBolt(secondBoltId, new ExclamationBolt(), 
2).shuffleGrouping(firstBoltId);
-
-               // emit result
-               if (fileInputOutput) {
-                       // read the text file from given input path
-                       final String[] tokens = outputPath.split(":");
-                       final String outputFile = tokens[tokens.length - 1];
-                       builder.setBolt(sinkId, new BoltFileSink(outputFile, 
formatter))
-                       .shuffleGrouping(secondBoltId);
-               } else {
-                       builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
-                       .shuffleGrouping(secondBoltId);
-               }
-
-               return builder;
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileInputOutput = false;
-       private static String textPath;
-       private static String outputPath;
-       private static int exclamationNum = 3;
-
-       static int getExclamation() {
-               return exclamationNum;
-       }
-
-       static boolean parseParameters(final String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileInputOutput = true;
-                       if (args.length == 3) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                               exclamationNum = Integer.parseInt(args[2]);
-                       } else {
-                               System.err.println("Usage: StormExclamation* 
<text path> <result path>  <number of exclamation marks>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing StormExclamation example 
with built-in default data");
-                       System.out.println("  Provide parameters to read input 
data from a file");
-                       System.out.println("  Usage: StormExclamation <text 
path> <result path> <number of exclamation marks>");
-               }
-
-               return true;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
deleted file mode 100644
index 19fe977..0000000
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithBolt.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.storm.excamation.operators.ExclamationBolt;
-import org.apache.flink.storm.util.StormConfig;
-import org.apache.flink.storm.wrappers.BoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import backtype.storm.utils.Utils;
-
-/**
- * Implements the "Exclamation" program that attaches 3+x exclamation marks to 
every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link 
backtype.storm.generated.StormTopology}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage:
- * <code>ExclamationWithmBolt &lt;text path&gt; &lt;result path&gt; &lt;number 
of exclamation marks&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData} with x=2.
- * <p>
- * This example shows how to:
- * <ul>
- * <li>use a Bolt within a Flink Streaming program</li>
- * <li>how to configure a Bolt using StormConfig</li>
- * </ul>
- */
-public class ExclamationWithBolt {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // set Storm configuration
-               StormConfig config = new StormConfig();
-               config.put(ExclamationBolt.EXCLAMATION_COUNT, new 
Integer(exclamationNum));
-               env.getConfig().setGlobalJobParameters(config);
-
-               // get input data
-               final DataStream<String> text = getTextDataStream(env);
-
-               final DataStream<String> exclaimed = text
-                               .transform("StormBoltTokenizer",
-                                               TypeExtractor.getForObject(""),
-                                               new BoltWrapper<String, 
String>(new ExclamationBolt(),
-                                                               new String[] { 
Utils.DEFAULT_STREAM_ID }))
-                                                               .map(new 
ExclamationMap());
-
-               // emit result
-               if (fileOutput) {
-                       exclaimed.writeAsText(outputPath);
-               } else {
-                       exclaimed.print();
-               }
-
-               // execute program
-               env.execute("Streaming WordCount with bolt tokenizer");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       private static class ExclamationMap implements MapFunction<String, 
String> {
-               private static final long serialVersionUID = 
4614754344067170619L;
-
-               @Override
-               public String map(String value) throws Exception {
-                       return value + "!!!";
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputPath;
-       private static int exclamationNum = 2;
-
-       private static boolean parseParameters(final String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 3) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                               exclamationNum = Integer.parseInt(args[2]);
-                       } else {
-                               System.err.println("Usage: ExclamationWithBolt 
<text path> <result path> <number of exclamation marks>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing ExclamationWithBolt 
example with built-in default data");
-                       System.out.println("  Provide parameters to read input 
data from a file");
-                       System.out.println("  Usage: ExclamationWithBolt <text 
path> <result path> <number of exclamation marks>");
-               }
-               return true;
-       }
-
-       private static DataStream<String> getTextDataStream(final 
StreamExecutionEnvironment env) {
-               if (fileOutput) {
-                       // read the text file from given input path
-                       return env.readTextFile(textPath);
-               }
-
-               return env.fromElements(WordCountData.WORDS);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
deleted file mode 100644
index a196995..0000000
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/ExclamationWithSpout.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.excamation;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.storm.util.FiniteFileSpout;
-import org.apache.flink.storm.util.FiniteInMemorySpout;
-import org.apache.flink.storm.util.StormConfig;
-import org.apache.flink.storm.wrappers.SpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import backtype.storm.utils.Utils;
-
-/**
- * Implements the "Exclamation" program that attaches six exclamation marks to 
every line of a text files in a streaming
- * fashion. The program is constructed as a regular {@link 
backtype.storm.generated.StormTopology}.
- * <p>
- * The input is a plain text file with lines separated by newline characters.
- * <p>
- * Usage: <code>ExclamationWithSpout &lt;text path&gt; &lt;result 
path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p>
- * This example shows how to:
- * <ul>
- * <li>use a Storm spout within a Flink Streaming program</li>
- * <li>make use of the FiniteSpout interface</li>
- * <li>make use of the FiniteSpout interface</li>
- * <li>how to configure a Spout using StormConfig</li>
- * </ul>
- */
-public class ExclamationWithSpout {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // get input data
-               final DataStream<String> text = getTextDataStream(env);
-
-               final DataStream<String> exclaimed = text
-                               .map(new ExclamationMap())
-                               .map(new ExclamationMap());
-
-               // emit result
-               if (fileOutput) {
-                       exclaimed.writeAsText(outputPath);
-               } else {
-                       exclaimed.print();
-               }
-
-               // execute program
-               env.execute("Streaming Exclamation with Storm spout source");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       private static class ExclamationMap implements MapFunction<String, 
String> {
-               private static final long serialVersionUID = 
-684993133807698042L;
-
-               @Override
-               public String map(String value) throws Exception {
-                       return value + "!!!";
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputPath;
-
-       private static boolean parseParameters(final String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 2) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: ExclamationWithSpout 
<text path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing ExclamationWithSpout 
example with built-in default data");
-                       System.out.println("  Provide parameters to read input 
data from a file");
-                       System.out.println("  Usage: ExclamationWithSpout <text 
path> <result path>");
-               }
-               return true;
-       }
-
-       private static DataStream<String> getTextDataStream(final 
StreamExecutionEnvironment env) {
-               if (fileOutput) {
-                       final String[] tokens = textPath.split(":");
-                       final String inputFile = tokens[tokens.length - 1];
-
-                       // set Storm configuration
-                       StormConfig config = new StormConfig();
-                       config.put(FiniteFileSpout.INPUT_FILE_PATH, inputFile);
-                       env.getConfig().setGlobalJobParameters(config);
-
-                       return env.addSource(
-                                       new SpoutWrapper<String>(new 
FiniteFileSpout(),
-                                                       new String[] { 
Utils.DEFAULT_STREAM_ID }),
-                                                       
TypeExtractor.getForClass(String.class)).setParallelism(1);
-               }
-
-               return env.addSource(
-                               new SpoutWrapper<String>(new 
FiniteInMemorySpout(
-                                               WordCountData.WORDS), new 
String[] { Utils.DEFAULT_STREAM_ID }),
-                                               
TypeExtractor.getForClass(String.class)).setParallelism(1);
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
deleted file mode 100644
index cfc49a1..0000000
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/excamation/operators/ExclamationBolt.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.storm.excamation.operators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-public class ExclamationBolt implements IRichBolt {
-       private final static long serialVersionUID = -6364882114201311380L;
-
-       public final static String EXCLAMATION_COUNT = "exclamation.count";
-
-       private OutputCollector collector;
-       private String exclamation;
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
-               this.collector = collector;
-
-               Object count = conf.get(EXCLAMATION_COUNT);
-               if (count != null) {
-                       int exclamationNum = (Integer) count;
-                       StringBuilder builder = new StringBuilder();
-                       for (int index = 0; index < exclamationNum; ++index) {
-                               builder.append('!');
-                       }
-                       this.exclamation = builder.toString();
-               } else {
-                       this.exclamation = "!";
-               }
-       }
-
-       @Override
-       public void cleanup() {
-       }
-
-       @Override
-       public void execute(Tuple tuple) {
-               collector.emit(tuple, new Values(tuple.getString(0) + 
this.exclamation));
-       }
-
-       @Override
-       public void declareOutputFields(OutputFieldsDeclarer declarer) {
-               declarer.declare(new Fields("word"));
-       }
-
-       @Override
-       public Map<String, Object> getComponentConfiguration() {
-               return null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
new file mode 100644
index 0000000..3df4290
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationLocal.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
+
+/**
+ * Implements the "Exclamation" program that attaches five exclamation mark to 
every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link 
backtype.storm.generated.StormTopology} and submitted to
+ * Flink for execution in the same way as to a Storm {@link 
backtype.storm.LocalCluster}.
+ * <p>
+ * This example shows how to run program directly within Java, thus it cannot 
be used to submit a
+ * {@link backtype.storm.generated.StormTopology} via Flink command line 
clients (ie, bin/flink).
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p>
+ * Usage: <code>ExclamationLocal &lt;text path&gt; &lt;result 
path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.examples.java.wordcount.util.WordCountData}.
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>run a regular Storm program locally on Flink</li>
+ * </ul>
+ */
+public class ExclamationLocal {
+
+       public final static String topologyId = "Streaming Exclamation";
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(final String[] args) throws Exception {
+
+               if (!ExclamationTopology.parseParameters(args)) {
+                       return;
+               }
+
+               // build Topology the Storm way
+               final TopologyBuilder builder = 
ExclamationTopology.buildTopology();
+
+               // execute program locally
+               Config conf = new Config();
+               conf.put(ExclamationBolt.EXCLAMATION_COUNT, 
ExclamationTopology.getExclamation());
+
+               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
+               cluster.submitTopology(topologyId, conf, 
FlinkTopology.createTopology(builder));
+
+               Utils.sleep(10 * 1000);
+               cluster.shutdown();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
new file mode 100644
index 0000000..43f526b
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationTopology.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation;
+
+import backtype.storm.topology.TopologyBuilder;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.BoltPrintSink;
+import org.apache.flink.storm.util.FiniteFileSpout;
+import org.apache.flink.storm.util.FiniteInMemorySpout;
+import org.apache.flink.storm.util.OutputFormatter;
+import org.apache.flink.storm.util.SimpleOutputFormatter;
+
+/**
+ * Implements the "Exclamation" program that attaches two exclamation marks to 
every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link 
backtype.storm.generated.StormTopology}.
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p>
+ * Usage: <code>Exclamation[Local|RemoteByClient|RemoteBySubmitter] &lt;text 
path&gt;
+ * &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>construct a regular Storm topology as Flink program</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * </ul>
+ */
+public class ExclamationTopology {
+
+       public final static String spoutId = "source";
+       public final static String firstBoltId = "exclamation1";
+       public final static String secondBoltId = "exclamation2";
+       public final static String sinkId = "sink";
+       private final static OutputFormatter formatter = new 
SimpleOutputFormatter();
+
+       public static TopologyBuilder buildTopology() {
+               final TopologyBuilder builder = new TopologyBuilder();
+
+               // get input data
+               if (fileInputOutput) {
+                       // read the text file from given input path
+                       final String[] tokens = textPath.split(":");
+                       final String inputFile = tokens[tokens.length - 1];
+                       builder.setSpout(spoutId, new 
FiniteFileSpout(inputFile));
+               } else {
+                       builder.setSpout(spoutId, new 
FiniteInMemorySpout(WordCountData.WORDS));
+               }
+
+               builder.setBolt(firstBoltId, new ExclamationBolt(), 
3).shuffleGrouping(spoutId);
+               builder.setBolt(secondBoltId, new ExclamationBolt(), 
2).shuffleGrouping(firstBoltId);
+
+               // emit result
+               if (fileInputOutput) {
+                       // read the text file from given input path
+                       final String[] tokens = outputPath.split(":");
+                       final String outputFile = tokens[tokens.length - 1];
+                       builder.setBolt(sinkId, new BoltFileSink(outputFile, 
formatter))
+                       .shuffleGrouping(secondBoltId);
+               } else {
+                       builder.setBolt(sinkId, new BoltPrintSink(formatter), 4)
+                       .shuffleGrouping(secondBoltId);
+               }
+
+               return builder;
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileInputOutput = false;
+       private static String textPath;
+       private static String outputPath;
+       private static int exclamationNum = 3;
+
+       static int getExclamation() {
+               return exclamationNum;
+       }
+
+       static boolean parseParameters(final String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       fileInputOutput = true;
+                       if (args.length == 3) {
+                               textPath = args[0];
+                               outputPath = args[1];
+                               exclamationNum = Integer.parseInt(args[2]);
+                       } else {
+                               System.err.println("Usage: StormExclamation* 
<text path> <result path>  <number of exclamation marks>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing StormExclamation example 
with built-in default data");
+                       System.out.println("  Provide parameters to read input 
data from a file");
+                       System.out.println("  Usage: StormExclamation <text 
path> <result path> <number of exclamation marks>");
+               }
+
+               return true;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
new file mode 100644
index 0000000..b47c0fa
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithBolt.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation;
+
+import backtype.storm.utils.Utils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "Exclamation" program that attaches 3+x exclamation marks to 
every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link 
backtype.storm.generated.StormTopology}.
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p>
+ * Usage:
+ * <code>ExclamationWithmBolt &lt;text path&gt; &lt;result path&gt; &lt;number 
of exclamation marks&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WordCountData} with x=2.
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Bolt within a Flink Streaming program</li>
+ * <li>how to configure a Bolt using StormConfig</li>
+ * </ul>
+ */
+public class ExclamationWithBolt {
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(final String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               // set up the execution environment
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // set Storm configuration
+               StormConfig config = new StormConfig();
+               config.put(ExclamationBolt.EXCLAMATION_COUNT, new 
Integer(exclamationNum));
+               env.getConfig().setGlobalJobParameters(config);
+
+               // get input data
+               final DataStream<String> text = getTextDataStream(env);
+
+               final DataStream<String> exclaimed = text
+                               .transform("StormBoltTokenizer",
+                                               TypeExtractor.getForObject(""),
+                                               new BoltWrapper<String, 
String>(new ExclamationBolt(),
+                                                               new String[] { 
Utils.DEFAULT_STREAM_ID }))
+                                                               .map(new 
ExclamationMap());
+
+               // emit result
+               if (fileOutput) {
+                       exclaimed.writeAsText(outputPath);
+               } else {
+                       exclaimed.print();
+               }
+
+               // execute program
+               env.execute("Streaming WordCount with bolt tokenizer");
+       }
+
+       // 
*************************************************************************
+       // USER FUNCTIONS
+       // 
*************************************************************************
+
+       private static class ExclamationMap implements MapFunction<String, 
String> {
+               private static final long serialVersionUID = 
4614754344067170619L;
+
+               @Override
+               public String map(String value) throws Exception {
+                       return value + "!!!";
+               }
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String textPath;
+       private static String outputPath;
+       private static int exclamationNum = 2;
+
+       private static boolean parseParameters(final String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if (args.length == 3) {
+                               textPath = args[0];
+                               outputPath = args[1];
+                               exclamationNum = Integer.parseInt(args[2]);
+                       } else {
+                               System.err.println("Usage: ExclamationWithBolt 
<text path> <result path> <number of exclamation marks>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing ExclamationWithBolt 
example with built-in default data");
+                       System.out.println("  Provide parameters to read input 
data from a file");
+                       System.out.println("  Usage: ExclamationWithBolt <text 
path> <result path> <number of exclamation marks>");
+               }
+               return true;
+       }
+
+       private static DataStream<String> getTextDataStream(final 
StreamExecutionEnvironment env) {
+               if (fileOutput) {
+                       // read the text file from given input path
+                       return env.readTextFile(textPath);
+               }
+
+               return env.fromElements(WordCountData.WORDS);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
new file mode 100644
index 0000000..380d9da
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/ExclamationWithSpout.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation;
+
+import backtype.storm.utils.Utils;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.util.FiniteFileSpout;
+import org.apache.flink.storm.util.FiniteInMemorySpout;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Implements the "Exclamation" program that attaches six exclamation marks to 
every line of a text files in a streaming
+ * fashion. The program is constructed as a regular {@link 
backtype.storm.generated.StormTopology}.
+ * <p>
+ * The input is a plain text file with lines separated by newline characters.
+ * <p>
+ * Usage: <code>ExclamationWithSpout &lt;text path&gt; &lt;result 
path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
+ * <p>
+ * This example shows how to:
+ * <ul>
+ * <li>use a Storm spout within a Flink Streaming program</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * <li>make use of the FiniteSpout interface</li>
+ * <li>how to configure a Spout using StormConfig</li>
+ * </ul>
+ */
+public class ExclamationWithSpout {
+
+       // 
*************************************************************************
+       // PROGRAM
+       // 
*************************************************************************
+
+       public static void main(final String[] args) throws Exception {
+
+               if (!parseParameters(args)) {
+                       return;
+               }
+
+               // set up the execution environment
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               // get input data
+               final DataStream<String> text = getTextDataStream(env);
+
+               final DataStream<String> exclaimed = text
+                               .map(new ExclamationMap())
+                               .map(new ExclamationMap());
+
+               // emit result
+               if (fileOutput) {
+                       exclaimed.writeAsText(outputPath);
+               } else {
+                       exclaimed.print();
+               }
+
+               // execute program
+               env.execute("Streaming Exclamation with Storm spout source");
+       }
+
+       // 
*************************************************************************
+       // USER FUNCTIONS
+       // 
*************************************************************************
+
+       private static class ExclamationMap implements MapFunction<String, 
String> {
+               private static final long serialVersionUID = 
-684993133807698042L;
+
+               @Override
+               public String map(String value) throws Exception {
+                       return value + "!!!";
+               }
+       }
+
+       // 
*************************************************************************
+       // UTIL METHODS
+       // 
*************************************************************************
+
+       private static boolean fileOutput = false;
+       private static String textPath;
+       private static String outputPath;
+
+       private static boolean parseParameters(final String[] args) {
+
+               if (args.length > 0) {
+                       // parse input arguments
+                       fileOutput = true;
+                       if (args.length == 2) {
+                               textPath = args[0];
+                               outputPath = args[1];
+                       } else {
+                               System.err.println("Usage: ExclamationWithSpout 
<text path> <result path>");
+                               return false;
+                       }
+               } else {
+                       System.out.println("Executing ExclamationWithSpout 
example with built-in default data");
+                       System.out.println("  Provide parameters to read input 
data from a file");
+                       System.out.println("  Usage: ExclamationWithSpout <text 
path> <result path>");
+               }
+               return true;
+       }
+
+       private static DataStream<String> getTextDataStream(final 
StreamExecutionEnvironment env) {
+               if (fileOutput) {
+                       final String[] tokens = textPath.split(":");
+                       final String inputFile = tokens[tokens.length - 1];
+
+                       // set Storm configuration
+                       StormConfig config = new StormConfig();
+                       config.put(FiniteFileSpout.INPUT_FILE_PATH, inputFile);
+                       env.getConfig().setGlobalJobParameters(config);
+
+                       return env.addSource(
+                                       new SpoutWrapper<String>(new 
FiniteFileSpout(),
+                                                       new String[] { 
Utils.DEFAULT_STREAM_ID }),
+                                                       
TypeExtractor.getForClass(String.class)).setParallelism(1);
+               }
+
+               return env.addSource(
+                               new SpoutWrapper<String>(new 
FiniteInMemorySpout(
+                                               WordCountData.WORDS), new 
String[] { Utils.DEFAULT_STREAM_ID }),
+                                               
TypeExtractor.getForClass(String.class)).setParallelism(1);
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
new file mode 100644
index 0000000..9bc00d2
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/exclamation/operators/ExclamationBolt.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.exclamation.operators;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class ExclamationBolt implements IRichBolt {
+       private final static long serialVersionUID = -6364882114201311380L;
+
+       public final static String EXCLAMATION_COUNT = "exclamation.count";
+
+       private OutputCollector collector;
+       private String exclamation;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(Map conf, TopologyContext context, OutputCollector 
collector) {
+               this.collector = collector;
+
+               Object count = conf.get(EXCLAMATION_COUNT);
+               if (count != null) {
+                       int exclamationNum = (Integer) count;
+                       StringBuilder builder = new StringBuilder();
+                       for (int index = 0; index < exclamationNum; ++index) {
+                               builder.append('!');
+                       }
+                       this.exclamation = builder.toString();
+               } else {
+                       this.exclamation = "!";
+               }
+       }
+
+       @Override
+       public void cleanup() {
+       }
+
+       @Override
+       public void execute(Tuple tuple) {
+               collector.emit(tuple, new Values(tuple.getString(0) + 
this.exclamation));
+       }
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields("word"));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
new file mode 100644
index 0000000..eaa91ee
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/join/SingleJoinExample.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.join;
+
+import backtype.storm.Config;
+import backtype.storm.testing.FeederSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import org.apache.flink.storm.util.BoltFileSink;
+import org.apache.flink.storm.util.TupleOutputFormatter;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.bolt.SingleJoinBolt;
+
+
+public class SingleJoinExample {
+
+       public static void main(String[] args) throws Exception {
+               final FeederSpout genderSpout = new FeederSpout(new 
Fields("id", "gender", "hobbies"));
+               final FeederSpout ageSpout = new FeederSpout(new Fields("id", 
"age"));
+
+               TopologyBuilder builder = new TopologyBuilder();
+
+               builder.setSpout("gender", genderSpout);
+
+               builder.setSpout("age", ageSpout);
+
+               builder.setBolt("join", new SingleJoinBolt(new Fields("gender", 
"age")))
+                       .fieldsGrouping("gender", new Fields("id"))
+                       .fieldsGrouping("age", new Fields("id"));
+
+               // emit result
+               if (args.length > 0) {
+               // read the text file from given input path
+                       builder.setBolt("fileOutput", new BoltFileSink(args[0], 
new TupleOutputFormatter()))
+                               .shuffleGrouping("join");
+               } else {
+                       builder.setBolt("print", new 
PrinterBolt()).shuffleGrouping("join");
+               }
+
+               Config conf = new Config();
+               conf.setDebug(true);
+
+               String[] hobbies = new String[] {"reading", "biking", 
"travelling", "watching tv"};
+
+               for (int i = 0; i < 10; i++) {
+                       String gender;
+                       if (i % 2 == 0) {
+                               gender = "male";
+                       }
+                       else {
+                               gender = "female";
+                       }
+                       genderSpout.feed(new Values(i, gender, hobbies[i % 
hobbies.length]));
+               }
+
+               for (int i = 9; i >= 0; i--) {
+                       ageSpout.feed(new Values(i, i + 20));
+               }
+
+
+               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
+               cluster.submitTopology("joinTopology", conf, 
FlinkTopology.createTopology(builder));
+
+               Utils.sleep(10 * 1000);
+
+               cluster.shutdown();
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
new file mode 100644
index 0000000..598a8d9
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/print/PrintSampleStream.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.print;
+
+import backtype.storm.Config;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.utils.Utils;
+import org.apache.flink.storm.api.FlinkLocalCluster;
+import org.apache.flink.storm.api.FlinkTopology;
+import storm.starter.bolt.PrinterBolt;
+import storm.starter.spout.TwitterSampleSpout;
+
+import java.util.Arrays;
+
+/**
+ * Prints incoming tweets. Tweets can be filtered by keywords.
+ */
+public class PrintSampleStream {        
+       public static void main(String[] args) throws Exception {
+               String consumerKey = args[0];
+               String consumerSecret = args[1];
+               String accessToken = args[2];
+               String accessTokenSecret = args[3];
+
+               // keywords start with the 5th parameter
+               String[] keyWords = Arrays.copyOfRange(args, 4, args.length);
+
+               TopologyBuilder builder = new TopologyBuilder();
+
+               builder.setSpout("twitter", new TwitterSampleSpout(consumerKey, 
consumerSecret,
+                                                               accessToken, 
accessTokenSecret, keyWords));
+               builder.setBolt("print", new PrinterBolt())
+                               .shuffleGrouping("twitter");
+
+
+               Config conf = new Config();
+
+               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
+               cluster.submitTopology("Print", conf, 
FlinkTopology.createTopology(builder));
+
+               Utils.sleep(10 * 1000);
+
+               cluster.shutdown();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
index 75450c4..4b41f8a 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteFileSpout.java
@@ -23,8 +23,6 @@ import backtype.storm.tuple.Values;
 import java.io.IOException;
 import java.util.Map;
 
-import org.apache.flink.storm.util.FiniteSpout;
-
 /**
  * Implements a Spout that reads data from a given local file. The spout stops 
automatically
  * when it reached the end of the file.

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
index 1490872..ff89a41 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/util/FiniteInMemorySpout.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.storm.util;
 
-import org.apache.flink.storm.util.FiniteSpout;
-
-
 /**
  * Implements a Spout that reads String[] data stored in memory. The Spout 
stops automatically when it emitted all of
  * the data.

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
index 6ee7fd9..cccf4c0 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCount.java
@@ -18,7 +18,6 @@
 package org.apache.flink.storm.wordcount;
 
 import backtype.storm.topology.IRichBolt;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
index e093714..9b0d4ee 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java
@@ -18,7 +18,6 @@
 package org.apache.flink.storm.wordcount;
 
 import backtype.storm.topology.IRichBolt;
-
 import org.apache.flink.api.java.io.CsvInputFormat;
 import org.apache.flink.api.java.io.PojoCsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
index f5a1a35..50d4518 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java
@@ -19,7 +19,6 @@ package org.apache.flink.storm.wordcount;
 
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.tuple.Fields;
-
 import org.apache.flink.api.java.io.CsvInputFormat;
 import org.apache.flink.api.java.io.TupleCsvInputFormat;
 import org.apache.flink.api.java.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
index 0a7dfa0..6da3e3c 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocal.java
@@ -19,11 +19,11 @@ package org.apache.flink.storm.wordcount;
 
 import backtype.storm.LocalCluster;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.utils.Utils;
-
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.api.FlinkTopology;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
@@ -35,7 +35,7 @@ import org.apache.flink.storm.api.FlinkTopologyBuilder;
  * <p>
  * The input is a plain text file with lines separated by newline characters.
  * <p>
- * Usage: <code>WordCountLocal &lt;text path&gt; &lt;result path&gt;</code><br>
+ * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
  * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
  * <p>
  * This example shows how to:
@@ -57,16 +57,13 @@ public class WordCountLocal {
                }
 
                // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
+               final TopologyBuilder builder = 
WordCountTopology.buildTopology();
 
-               // execute program locally
                final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               cluster.submitTopology(topologyId, null, 
builder.createTopology());
+               cluster.submitTopology(topologyId, null, 
FlinkTopology.createTopology(builder));
 
                Utils.sleep(10 * 1000);
 
-               // TODO kill does no do anything so far
-               cluster.killTopology(topologyId);
                cluster.shutdown();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
index 67f4bbe..e6fd141 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountLocalByName.java
@@ -19,11 +19,11 @@ package org.apache.flink.storm.wordcount;
 
 import backtype.storm.LocalCluster;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.utils.Utils;
-
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.api.FlinkLocalCluster;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.api.FlinkTopology;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
@@ -58,17 +58,14 @@ public class WordCountLocalByName {
                }
 
                // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology(false);
+               final TopologyBuilder builder = 
WordCountTopology.buildTopology(false);
 
-               // execute program locally
                final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               cluster.submitTopology(topologyId, null, 
builder.createTopology());
+               cluster.submitTopology(topologyId, null, 
FlinkTopology.createTopology(builder));
 
                Utils.sleep(10 * 1000);
 
-               // TODO kill does no do anything so far
-               cluster.killTopology(topologyId);
                cluster.shutdown();
-       }
 
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
index 8cb7cdd..c7c7f7c 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteByClient.java
@@ -22,12 +22,13 @@ import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
 import backtype.storm.generated.NotAliveException;
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.api.FlinkClient;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.api.FlinkTopology;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
@@ -63,7 +64,7 @@ public class WordCountRemoteByClient {
                }
 
                // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
+               final TopologyBuilder builder = 
WordCountTopology.buildTopology();
 
                // execute program on Flink cluster
                final Config conf = new Config();
@@ -73,7 +74,7 @@ public class WordCountRemoteByClient {
                conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
 
                final FlinkClient cluster = 
FlinkClient.getConfiguredClient(conf);
-               cluster.submitTopology(topologyId, uploadedJarLocation, 
builder.createTopology());
+               cluster.submitTopology(topologyId, uploadedJarLocation, 
FlinkTopology.createTopology(builder));
 
                Utils.sleep(5 * 1000);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
index a20843e..eb2713d 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountRemoteBySubmitter.java
@@ -21,10 +21,11 @@ import backtype.storm.Config;
 import backtype.storm.StormSubmitter;
 import backtype.storm.generated.StormTopology;
 
+import backtype.storm.topology.TopologyBuilder;
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.storm.api.FlinkClient;
 import org.apache.flink.storm.api.FlinkSubmitter;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.storm.api.FlinkTopology;
 
 /**
  * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
@@ -57,7 +58,7 @@ public class WordCountRemoteBySubmitter {
                }
 
                // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
+               final TopologyBuilder builder = 
WordCountTopology.buildTopology();
 
                // execute program on Flink cluster
                final Config conf = new Config();
@@ -71,7 +72,7 @@ public class WordCountRemoteBySubmitter {
                // The user jar file must be specified via JVM argument if 
executed via Java.
                // => -Dstorm.jar=target/WordCount-StormTopology.jar
                // If bin/flink is used, the jar file is detected automatically.
-               FlinkSubmitter.submitTopology(topologyId, conf, 
builder.createTopology());
+               FlinkSubmitter.submitTopology(topologyId, conf, 
FlinkTopology.createTopology(builder));
 
                Thread.sleep(5 * 1000);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
index 138df65..3e7c257 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/WordCountTopology.java
@@ -18,13 +18,12 @@
 package org.apache.flink.storm.wordcount;
 
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
-
 import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
-import org.apache.flink.storm.util.OutputFormatter;
 import org.apache.flink.storm.util.BoltFileSink;
 import org.apache.flink.storm.util.BoltPrintSink;
+import org.apache.flink.storm.util.OutputFormatter;
 import org.apache.flink.storm.util.TupleOutputFormatter;
 import org.apache.flink.storm.wordcount.operators.BoltCounter;
 import org.apache.flink.storm.wordcount.operators.BoltCounterByName;
@@ -55,13 +54,13 @@ public class WordCountTopology {
        public final static String sinkId = "sink";
        private final static OutputFormatter formatter = new 
TupleOutputFormatter();
 
-       public static FlinkTopologyBuilder buildTopology() {
+       public static TopologyBuilder buildTopology() {
                return buildTopology(true);
        }
 
-       public static FlinkTopologyBuilder buildTopology(boolean indexOrName) {
+       public static TopologyBuilder buildTopology(boolean indexOrName) {
 
-               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+               final TopologyBuilder builder = new TopologyBuilder();
 
                // get input data
                if (fileInputOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
index eb96160..c06c268 100644
--- 
a/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
+++ 
b/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/operators/WordCountInMemorySpout.java
@@ -17,16 +17,15 @@
 
 package org.apache.flink.storm.wordcount.operators;
 
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.storm.util.InMemorySpout;
-
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
+import org.apache.flink.examples.java.wordcount.util.WordCountData;
+import org.apache.flink.storm.util.FiniteInMemorySpout;
 
 /**
  * Implements a Spout that reads data from {@link WordCountData#WORDS}.
  */
-public final class WordCountInMemorySpout extends InMemorySpout<String> {
+public final class WordCountInMemorySpout extends FiniteInMemorySpout {
        private static final long serialVersionUID = 8832143302409465843L;
 
        public WordCountInMemorySpout() {

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
index 781396c..5a37572 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithBoltITCase.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.storm.exclamation;
 
-import org.apache.flink.storm.excamation.ExclamationWithBolt;
 import org.apache.flink.storm.exclamation.util.ExclamationData;
-import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class ExclamationWithBoltITCase extends StormTestBase {
+public class ExclamationWithBoltITCase extends StreamingProgramTestBase {
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
index 36b8aed..c2b0467 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/ExclamationWithSpoutITCase.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.storm.exclamation;
 
-import org.apache.flink.storm.excamation.ExclamationWithSpout;
 import org.apache.flink.storm.exclamation.util.ExclamationData;
-import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class ExclamationWithSpoutITCase extends StormTestBase {
+public class ExclamationWithSpoutITCase extends StreamingProgramTestBase {
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
index cec276f..049c881 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/exclamation/StormExclamationLocalITCase.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.storm.exclamation;
 
-import org.apache.flink.storm.excamation.ExclamationLocal;
 import org.apache.flink.storm.exclamation.util.ExclamationData;
-import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
 import org.apache.flink.test.testdata.WordCountData;
 
-public class StormExclamationLocalITCase extends StormTestBase {
+public class StormExclamationLocalITCase extends StreamingProgramTestBase {
 
        protected String textPath;
        protected String resultPath;

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
new file mode 100644
index 0000000..b51db2c
--- /dev/null
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/join/SingleJoinITCase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.join;
+
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+public class SingleJoinITCase extends StreamingProgramTestBase {
+
+       protected static String expectedOutput[] = {
+                       "(male,20)",
+                       "(female,21)",
+                       "(male,22)",
+                       "(female,23)",
+                       "(male,24)",
+                       "(female,25)",
+                       "(male,26)",
+                       "(female,27)",
+                       "(male,28)",
+                       "(female,29)"
+       };
+
+       protected String resultPath;
+
+       @Override
+       protected void preSubmit() throws Exception {
+               this.resultPath = this.getTempDirPath("result");
+       }
+
+       @Override
+       protected void postSubmit() throws Exception {
+               
compareResultsByLinesInMemory(Joiner.on("\n").join(expectedOutput), 
this.resultPath);
+       }
+
+       @Override
+       protected void testProgram() throws Exception {
+               // We need to remove the file scheme because we can't use the 
Flink file system.
+               // (to remain compatible with Storm)
+               SingleJoinExample.main(new String[]{ 
this.resultPath.replace("file:", "") });
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/88636799/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
index d0973cb..38147b2 100644
--- 
a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
+++ 
b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/split/SplitBoltTopology.java
@@ -17,12 +17,12 @@
  */
 package org.apache.flink.storm.split;
 
-import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import backtype.storm.topology.TopologyBuilder;
 import org.apache.flink.storm.split.operators.RandomSpout;
 import org.apache.flink.storm.split.operators.VerifyAndEnrichBolt;
-import org.apache.flink.storm.util.OutputFormatter;
 import org.apache.flink.storm.util.BoltFileSink;
 import org.apache.flink.storm.util.BoltPrintSink;
+import org.apache.flink.storm.util.OutputFormatter;
 import org.apache.flink.storm.util.TupleOutputFormatter;
 
 public class SplitBoltTopology {
@@ -33,8 +33,8 @@ public class SplitBoltTopology {
        public final static String sinkId = "sink";
        private final static OutputFormatter formatter = new 
TupleOutputFormatter();
 
-       public static FlinkTopologyBuilder buildTopology() {
-               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
+       public static TopologyBuilder buildTopology() {
+               final TopologyBuilder builder = new TopologyBuilder();
 
                builder.setSpout(spoutId, new RandomSpout(false, seed));
                builder.setBolt(boltId, new 
SplitBolt()).shuffleGrouping(spoutId);

Reply via email to