Repository: flink
Updated Branches:
  refs/heads/master 3f3aeb7e0 -> ea4f339d7


[FLINK-2305] Add documenation about Storm compatibility layer
additional improvements
- added RawOutputFormatter
- bug fix in SimpleOutputFormatter
- enable default output formatter for StormBoltFileSink

Closes #884


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea4f339d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea4f339d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea4f339d

Branch: refs/heads/master
Commit: ea4f339d76574c8499f3588524bb174ec3283a2d
Parents: 3f3aeb7
Author: mjsax <[email protected]>
Authored: Thu Jul 2 02:22:08 2015 +0200
Committer: Gyula Fora <[email protected]>
Committed: Sat Jul 11 14:19:43 2015 +0200

----------------------------------------------------------------------
 docs/_includes/navbar.html                      |   1 +
 docs/apis/storm_compatibility.md                | 155 +++++++++++++++++++
 .../excamation/ExclamationTopology.java         |   4 +-
 .../util/RawOutputFormatter.java                |  32 ++++
 .../util/SimpleOutputFormatter.java             |   2 +-
 .../util/StormBoltFileSink.java                 |   4 +
 6 files changed, 195 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/docs/_includes/navbar.html
----------------------------------------------------------------------
diff --git a/docs/_includes/navbar.html b/docs/_includes/navbar.html
index dc7ef30..cdd801f 100644
--- a/docs/_includes/navbar.html
+++ b/docs/_includes/navbar.html
@@ -81,6 +81,7 @@ under the License.
                 <li><a href="{{ apis }}/iterations.html">Iterations</a></li>
                 <li><a href="{{ apis }}/java8.html">Java 8</a></li>
                 <li><a href="{{ apis }}/hadoop_compatibility.html">Hadoop 
Compatability <span class="badge">Beta</span></a></li>
+                <li><a href="{{ apis }}/storm_compatibility.html">Storm 
Compatability <span class="badge">Beta</span></a></li>
               </ul>
             </li>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/docs/apis/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/storm_compatibility.md b/docs/apis/storm_compatibility.md
new file mode 100644
index 0000000..0f6b17b
--- /dev/null
+++ b/docs/apis/storm_compatibility.md
@@ -0,0 +1,155 @@
+---
+title: "Storm Compatibility"
+is_beta: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Flink streaming](streaming_guide.html) is compatible with Apache Storm 
interfaces and therefore allows
+reusing code that was implemented for Storm.
+
+You can:
+
+- execute a whole Storm `Topology` in Flink.
+- use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
+
+This document shows how to use existing Storm code with Flink.
+
+* This will be replaced by the TOC
+{:toc}
+
+### Project Configuration
+
+Support for Storm is contained in the `flink-storm-compatibility-core` Maven 
module.
+The code resides in the `org.apache.flink.stormcompatibility` package.
+
+Add the following dependency to your `pom.xml` if you want to execute Storm 
code in Flink.
+
+~~~xml
+<dependency>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-storm-compatibility-core</artifactId>
+       <version>{{site.version}}</version>
+</dependency>
+~~~
+
+**Please note**: `flink-storm-compatibility-core` is not part of the provided 
binary Flink distribution. Thus, you need to include 
`flink-storm-compatiblitly-core` classes (and their dependencies) in your 
program jar that is submitted to Flink's JobManager.
+
+### Execute Storm Topologies
+
+Flink provides a Storm compatible API 
(`org.apache.flink.stormcompatibility.api`) that offers replacements for the 
following classes:
+
+- `TopologyBuilder` replaced by `FlinkTopologyBuilder`
+- `StormSubmitter` replaced by `FlinkSubmitter`
+- `NimbusClient` and `Client` replaced by `FlinkClient`
+- `LocalCluster` replaced by `FlinkLocalCluster`
+
+In order to submit a Storm topology to Flink, it is sufficient to replace the 
used Storm classed with their Flink replacements in the original Storm client 
code that assembles the topology.
+If a topology is executed in a remote cluster, parameters `nimbus.host` and 
`nimbus.thrift.port` are used as `jobmanger.rpc.address` and 
`jobmanger.rpc.port`, respectively.
+If a parameter is not specified, the value is taken from `flink-conf.yaml`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: 
TopologyBuilder builder = new FlinkTopology();
+
+builder.setSpout("source", new StormFileSpout(inputFilePath));
+builder.setBolt("tokenizer", new 
StormBoltTokenizer()).shuffleGrouping("source");
+builder.setBolt("counter", new StormBoltCounter()).fieldsGrouping("tokenizer", 
new Fields("word"));
+builder.setBolt("sink", new 
StormBoltFileSink(outputFilePath)).shuffleGrouping("counter");
+
+Config conf = new Config();
+if(runLocal) { // submit to test cluster
+       FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: 
LocalCluster cluster = new LocalCluster();
+       cluster.submitTopology("WordCount", conf, builder.createTopology());
+} else { // submit to remote cluster
+       // optional
+       // conf.put(Config.NIMBUS_HOST, "remoteHost");
+       // conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
+       FlinkSubmitter.submitTopology("WordCount", conf, 
builder.createTopology()); // replaces: 
StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
+}
+~~~
+</div>
+</div>
+
+### Embed Storm Operators in Flink Streaming Programs 
+
+As an alternative, Spouts and Bolts can be embedded into regular streaming 
programs.
+The Storm compatibility layer offers a wrapper classes for each, namely 
`StormSpoutWrapper` and `StormBoltWrapper` 
(`org.apache.flink.stormcompatibility.wrappers`).
+
+Per default, both wrappers convert Storm output tuples to Flink's `Tuple` 
types (ie, `Tuple1` to `Tuple25` according to the number of fields of the Storm 
tuples).
+For single field output tuples a conversion to the field's data type is also 
possible (eg, `String` instead of `Tuple1<String>`).
+
+Because Flink cannot infer the output field types of Storm operators, it is 
required to specify the output type manually.
+In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` 
can be used.
+
+#### Embed Spouts
+
+In order to use a Spout as Flink source, use 
`StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
+The Spout object is handed to the constructor of `StormSpoutWrapper<OUT>` that 
serves as first argument to `addSource(...)`.
+The generic type declaration `OUT` specifies the type of the source output 
stream.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// stream has `raw` type (single field output streams only)
+DataStream<String> rawInput = env.addSource(
+       new StormSpoutWrapper<String>(new StormFileSpout(localFilePath), true), 
// Spout source, 'true' for raw type
+       TypeExtractor.getForClass(String.class)); // output type
+
+// process data stream
+[...]
+~~~
+</div>
+</div>
+
+If a Spout emits a finite number of tuples, `StormFiniteSpoutWrapper` can be 
used instead of `StormSpoutWrapper`.
+Using `StormFiniteSpoutWrapper` allows the Flink program to shut down 
automatically after all data is processed.
+If `StormSpoutWrapper` is used, the program will run until it is 
[canceled](cli.html) manually.
+
+
+#### Embed Bolts
+
+In order to use a Bolt as Flink operator, use `DataStream.transform(String, 
TypeInformation, OneInputStreamOperator)`.
+The Bolt object is handed to the constructor of `StormBoltWrapper<IN,OUT>` 
that serves as last argument to `transform(...)`.
+The generic type declarations `IN` and `OUT` specify the type of the 
operator's input and output stream, respectively.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+DataStream<String> text = env.readTextFile(localFilePath);
+
+DataStream<Tuple2<String, Integer>> counts = text.transform(
+       "tokenizer", // operator name
+       TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // 
output type
+       new StormBoltWrapper<String, Tuple2<String, Integer>>(new 
StormBoltTokenizer())); // Bolt operator
+
+// do further processing
+[...]
+~~~
+</div>
+</div>
+
+### Storm Compatibility Examples
+
+You can find more examples in Maven module `flink-storm-compatibilty-examples`.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
index 9ff3fb7..a5bb571 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/excamation/ExclamationTopology.java
@@ -21,7 +21,7 @@ import 
org.apache.flink.examples.java.wordcount.util.WordCountData;
 import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
 import 
org.apache.flink.stormcompatibility.excamation.stormoperators.ExclamationBolt;
 import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.SimpleOutputFormatter;
+import org.apache.flink.stormcompatibility.util.RawOutputFormatter;
 import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
 import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
 import org.apache.flink.stormcompatibility.util.StormFileSpout;
@@ -36,7 +36,7 @@ public class ExclamationTopology {
        public final static String firstBoltId = "exclamation1";
        public final static String secondBoltId = "exclamation2";
        public final static String sinkId = "sink";
-       private final static OutputFormatter formatter = new 
SimpleOutputFormatter();
+       private final static OutputFormatter formatter = new 
RawOutputFormatter();
 
        public static FlinkTopologyBuilder buildTopology() {
                final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
new file mode 100644
index 0000000..7faf6cd
--- /dev/null
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/RawOutputFormatter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.stormcompatibility.util;
+
+import backtype.storm.tuple.Tuple;
+
+public class RawOutputFormatter implements OutputFormatter {
+       private static final long serialVersionUID = 8685668993521259832L;
+
+       @Override
+       public String format(final Tuple input) {
+               assert (input.size() == 1);
+               return input.getValue(0).toString();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
index a9d72d9..ccb617b 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
@@ -25,7 +25,7 @@ public class SimpleOutputFormatter implements OutputFormatter 
{
 
        @Override
        public String format(final Tuple input) {
-               return input.getValue(0).toString();
+               return input.getValues().toString();
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea4f339d/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
index a92bc6a..ee8dca4 100644
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
+++ 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
@@ -34,6 +34,10 @@ public final class StormBoltFileSink extends 
AbstractStormBoltSink {
        private final String path;
        private BufferedWriter writer;
 
+       public StormBoltFileSink(final String path) {
+               this(path, new SimpleOutputFormatter());
+       }
+
        public StormBoltFileSink(final String path, final OutputFormatter 
formatter) {
                super(formatter);
                this.path = path;

Reply via email to