http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/savepoints.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/savepoints.md 
b/docs/apis/streaming/savepoints.md
new file mode 100644
index 0000000..ee4155f
--- /dev/null
+++ b/docs/apis/streaming/savepoints.md
@@ -0,0 +1,110 @@
+---
+title: "Savepoints"
+is_beta: false
+sub-nav-group: streaming
+sub-nav-pos: 4
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Programs written in the [Data Stream API]({{ site.baseurl 
}}/apis/streaming_guide.html) can resume execution from a **savepoint**. 
Savepoints allow both updating your programs and your Flink cluster without 
losing any state. This page covers all steps to trigger, restore, and dispose 
savepoints. For more details on how Flink handles state and failures, check out 
the [State in Streaming Programs]({{ site.baseurl }}/apis/state_backends.html) 
and [Fault Tolerance]({{ site.baseurl }}/apis/fault_tolerance.html) pages.
+
+* toc
+{:toc}
+
+## Overview
+
+Savepoints are **manually triggered checkpoints**, which take a snapshot of 
the program and write it out to a state backend. They rely on the regular 
checkpointing mechanism for this. During execution programs are periodically 
snapshotted on the worker nodes and produce checkpoints. For recovery only the 
last completed checkpoint is needed and older checkpoints can be safely 
discarded as soon as a new one is completed.
+
+Savepoints are similar to these periodic checkpoints except that they are 
**triggered by the user** and **don't automatically expire** when newer 
checkpoints are completed.
+
+<img src="fig/savepoints-overview.png" class="center" />
+
+In the above example the workers produce checkpoints **c<sub>1</sub>**, 
**c<sub>2</sub>**, **c<sub>3</sub>**, and **c<sub>4</sub>** for job *0xA312Bc*. 
Periodic checkpoints **c<sub>1</sub>** and **c<sub>3</sub>** have already been 
*discarded* and **c<sub>4</sub>** is the *latest checkpoint*. **c<sub>2</sub> 
is special**. It is the state associated with the savepoint **s<sub>1</sub>** 
and has been triggered by the user and it doesn't expire automatically (as 
c<sub>1</sub> and c<sub>3</sub> did after the completion of newer checkpoints).
+
+Note that **s<sub>1</sub>** is only a **pointer to the actual checkpoint data 
c<sub>2</sub>**. This means that the actual state is *not copied* for the 
savepoint and periodic checkpoint data is kept around.
+
+## Configuration
+
+Savepoints point to regular checkpoints and store their state in a configured 
[state backend]({{ site.baseurl }}/apis/state_backends.html). Currently, the 
supported state backends are **jobmanager** and **filesystem**. The state 
backend configuration for the regular periodic checkpoints is **independent** 
of the savepoint state backend configuration. Checkpoint data is **not copied** 
for savepoints, but points to the configured checkpoint state backend.
+
+### JobManager
+
+This is the **default backend** for savepoints.
+
+Savepoints are stored on the heap of the job manager. They are *lost* after 
the job manager is shut down. This mode is only useful if you want to *stop* 
and *resume* your program while the **same cluster** keeps running. It is *not 
recommended* for production use. Savepoints are *not* part of the [job 
manager's highly available]({{ site.baseurl 
}}/setup/jobmanager_high_availability.html) state.
+
+<pre>
+savepoints.state.backend: jobmanager
+</pre>
+
+**Note**: If you don't configure a specific state backend for the savepoints, 
the jobmanager backend will be used.
+
+### File system
+
+Savepoints are stored in the configured **file system directory**. They are 
available between cluster instances and allow to move your program to another 
cluster.
+
+<pre>
+savepoints.state.backend: filesystem
+savepoints.state.backend.fs.dir: hdfs:///flink/savepoints
+</pre>
+
+**Note**: If you don't configure a specific directory, the job manager backend 
will be used.
+
+**Important**: A savepoint is a pointer to a completed checkpoint. That means 
that the state of a savepoint is not only found in the savepoint file itself, 
but also needs the actual checkpoint data (e.g. in a set of further files). 
Therefore, using the *filesystem* backend for savepoints and the *jobmanager* 
backend for checkpoints does not work, because the required checkpoint data 
won't be available after a job manager restart.
+
+## Changes to your program
+
+Savepoints **work out of the box**, but it is **highly recommended** that you 
slightly adjust your programs in order to be able to work with savepoints in 
future versions of your program.
+
+<img src="fig/savepoints-program_ids.png" class="center" />
+
+For savepoints **only stateful tasks matter**. In the above example, the 
source and map tasks are stateful whereas the sink is not stateful. Therefore, 
only the state of the source and map tasks are part of the savepoint.
+
+Each task is identified by its **generated task IDs** and **subtask index**. 
In the above example the state of the source (**s<sub>1</sub>**, 
**s<sub>2</sub>**) and map tasks (**m<sub>1</sub>**, **m<sub>2</sub>**) is 
identified by their respective task ID (*0xC322EC* for the source tasks and 
*0x27B3EF* for the map tasks) and subtask index. There is no state for the 
sinks (**t<sub>1</sub>**, **t<sub>2</sub>**). Their IDs therefore do not matter.
+
+<span class="label label-danger">Important</span> The IDs are generated 
**deterministically** from your program structure. This means that as long as 
your program does not change, the IDs do not change. **The only allowed changes 
are within the user function, e.g. you can change the implemented `MapFunction` 
without changing the typology**. In this case, it is straight forward to 
restore the state from a savepoint by mapping it back to the same task IDs and 
subtask indexes. This allows you to work with savepoints out of the box, but 
gets problematic as soon as you make changes to the topology, because they 
result in changed IDs and the savepoint state cannot be mapped to your program 
any more.
+
+<span class="label label-info">Recommended</span> In order to be able to 
change your program and **have fixed IDs**, the *DataStream* API provides a 
method to manually specify the task IDs. Each operator provides a 
**`uid(String)`** method to override the generated ID. The ID is a String, 
which will be deterministically hashed to a 16-byte hash value. It is 
**important** that the specified IDs are **unique per transformation and job**. 
If this is not the case, job submission will fail.
+
+{% highlight scala %}
+DataStream<String> stream = env.
+  // Stateful source (e.g. Kafka) with ID
+  .addSource(new StatefulSource())
+  .uid("source-id")
+  .shuffle()
+  // The stateful mapper with ID
+  .map(new StatefulMapper())
+  .uid("mapper-id")
+
+// Stateless sink (no specific ID required)
+stream.print()
+{% endhighlight %}
+
+## Command-line client
+
+You control the savepoints via the [command line 
client]({{site.baseurl}}/apis/cli.html#savepoints).
+
+## Current limitations
+
+- **Parallelism**: When restoring a savepoint, the parallelism of the program 
has to match the parallelism of the original program from which the savepoint 
was drawn. There is no mechanism to re-partition the savepoint's state yet.
+
+- **Chaining**: Chained operators are identified by the ID of the first task. 
It's not possible to manually assign an ID to an intermediate chained task, 
e.g. in the chain `[  a -> b -> c ]` only **a** can have its ID assigned 
manually, but not **b** or **c**. To work around this, you can [manually define 
the task chains](streaming_guide.html#task-chaining-and-resource-groups). If 
you rely on the automatic ID assignment, a change in the chaining behaviour 
will also change the IDs.
+
+- **Disposing custom state handles**: Disposing an old savepoint does not work 
with custom state handles (if you are using a custom state backend), because 
the user code class loader is not available during disposal.

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/state_backends.md 
b/docs/apis/streaming/state_backends.md
new file mode 100644
index 0000000..06f9b24
--- /dev/null
+++ b/docs/apis/streaming/state_backends.md
@@ -0,0 +1,123 @@
+---
+title:  "State Backends"
+sub-nav-group: streaming
+sub-nav-pos: 1
+sub-nav-parent: fault_tolerance
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Programs written in the [Data Stream API]({{ site.baseurl 
}}/apis/streaming_guide.html) often hold state in various forms:
+
+- Windows gather elements or aggregates until they are triggered
+- Transformation functions may use the key/value state interface to store 
values
+- Transformation functions may implement the `Checkpointed` interface to make 
their local variables fault tolerant
+
+See also [Working with State]({{ site.baseurl 
}}/apis/streaming_guide.html#working_with_state) in the streaming API guide.
+
+When checkpointing is activated, such state is persisted upon checkpoints to 
guard against data loss and recover consistently.
+How the state is represented internally, and how and where it is persisted 
upon checkpoints depends on the
+chosen **State Backend**.
+
+
+## Available State Backends
+
+Out of the box, Flink bundles two state backends: *MemoryStateBacked* and 
*FsStateBackend*. If nothing else is configured,
+the system will use the MemoryStateBacked.
+
+
+### The MemoryStateBackend
+
+The *MemoryStateBacked* holds data internally as objects on the Java heap. 
Key/value state and window operators hold hash tables
+that store the values, triggers, etc.
+
+Upon checkpoints, this state backend will snapshot the state and send it as 
part of the checkpoint acknowledgement messages to the
+JobManager (master), which stores it on its heap as well.
+
+Limitations of the MemoryStateBackend:
+
+  - The size of each individual state is by default limited to 5 MB. This 
value can be increased in the constructor of the MemoryStateBackend.
+  - Irrespective of the configured maximal state size, the state cannot be 
larger than the akka frame size (see [Configuration]({{ site.baseurl 
}}/setup/config.html)).
+  - The aggregate state must fit into the JobManager memory.
+
+The MemoryStateBackend is encouraged for:
+
+  - Local development and debugging
+  - Jobs that do hold little state, such as jobs that consist only of 
record-at-a-time functions (Map, FlatMap, Filter, ...). The Kafka Consumer 
requires very little state.
+
+
+### The FsStateBackend
+
+The *FsStateBackend* (FileSystemStateBackend) is configured with a file system 
URL (type, address, path), such as for example 
"hdfs://namenode:40010/flink/checkpoints" or "file:///data/flink/checkpoints".
+
+The FsStateBackend holds in-flight data in the TaskManager's memory. Upon 
checkpoints, it writes state snapshots into files in the configured file system 
and directory. Minimal metadata is stored in the JobManager's memory (or, in 
high-availability mode, in the metadata checkpoint).
+
+The FsStateBackend is encouraged for:
+
+  - Jobs with large state, long windows, large key/value states.
+  - All high-availability setups.
+
+
+## Configuring a State Backend
+
+State backends can be configured per job. In addition, you can define a 
default state backend to be used when the
+job does not explicitly define a state backend.
+
+
+### Setting the Per-job State Backend
+
+The per-job state backend is set on the `StreamExecutionEnvironment` of the 
job, as shown in the example below:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(new 
FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStateBackend(new 
FsStateBackend("hdfs://namenode:40010/flink/checkpoints"))
+{% endhighlight %}
+</div>
+</div>
+
+
+### Setting Default State Backend
+
+A default state backend can be configured in the `flink-conf.yaml`, using the 
configuration key `state.backend`.
+
+Possible values for the config entry are *jobmanager* (MemoryStateBackend), 
*filesystem* (FsStateBackend), or the fully qualified class
+name of the class that implements the state backend factory 
[FsStateBackendFactory](https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java).
+
+In the case where the default state backend is set to *filesystem*, the entry 
`state.backend.fs.checkpointdir` defines the directory where the checkpoint 
data will be stored.
+
+A sample section in the configuration file could look as follows:
+
+~~~
+# The backend that will be used to store operator state checkpoints
+
+state.backend: filesystem
+
+
+# Directory for storing checkpoints
+
+state.backend.fs.checkpointdir: hdfs://namenode:40010/flink/checkpoints
+~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/storm_compatibility.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/storm_compatibility.md 
b/docs/apis/streaming/storm_compatibility.md
new file mode 100644
index 0000000..0ea0b01
--- /dev/null
+++ b/docs/apis/streaming/storm_compatibility.md
@@ -0,0 +1,287 @@
+---
+title: "Storm Compatibility"
+is_beta: true
+sub-nav-group: streaming
+sub-nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+[Flink streaming](streaming_guide.html) is compatible with Apache Storm 
interfaces and therefore allows
+reusing code that was implemented for Storm.
+
+You can:
+
+- execute a whole Storm `Topology` in Flink.
+- use Storm `Spout`/`Bolt` as source/operator in Flink streaming programs.
+
+This document shows how to use existing Storm code with Flink.
+
+* This will be replaced by the TOC
+{:toc}
+
+# Project Configuration
+
+Support for Storm is contained in the `flink-storm` Maven module.
+The code resides in the `org.apache.flink.storm` package.
+
+Add the following dependency to your `pom.xml` if you want to execute Storm 
code in Flink.
+
+~~~xml
+<dependency>
+       <groupId>org.apache.flink</groupId>
+       <artifactId>flink-storm</artifactId>
+       <version>{{site.version}}</version>
+</dependency>
+~~~
+
+**Please note**: Do not add `storm-core` as a dependency. It is already 
included via `flink-storm`.
+
+**Please note**: `flink-storm` is not part of the provided binary Flink 
distribution.
+Thus, you need to include `flink-storm` classes (and their dependencies) in 
your program jar (also called ueber-jar or fat-jar) that is submitted to 
Flink's JobManager.
+See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how 
to package a jar correctly.
+
+If you want to avoid large ueber-jars, you can manually copy 
`storm-core-0.9.4.jar`, `json-simple-1.1.jar` and 
`flink-storm-{{site.version}}.jar` into Flink's `lib/` folder of each cluster 
node (*before* the cluster is started).
+For this case, it is sufficient to include only your own Spout and Bolt 
classes (and their internal dependencies) into the program jar.
+
+# Execute Storm Topologies
+
+Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that 
offers replacements for the following classes:
+
+- `StormSubmitter` replaced by `FlinkSubmitter`
+- `NimbusClient` and `Client` replaced by `FlinkClient`
+- `LocalCluster` replaced by `FlinkLocalCluster`
+
+In order to submit a Storm topology to Flink, it is sufficient to replace the 
used Storm classes with their Flink replacements in the Storm *client code that 
assembles* the topology.
+The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
+If a topology is executed in a remote cluster, parameters `nimbus.host` and 
`nimbus.thrift.port` are used as `jobmanger.rpc.address` and 
`jobmanger.rpc.port`, respectively.  If a parameter is not specified, the value 
is taken from `flink-conf.yaml`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder
+
+// actual topology assembling code and used Spouts/Bolts can be used as-is
+builder.setSpout("source", new FileSpout(inputFilePath));
+builder.setBolt("tokenizer", new BoltTokenizer()).shuffleGrouping("source");
+builder.setBolt("counter", new BoltCounter()).fieldsGrouping("tokenizer", new 
Fields("word"));
+builder.setBolt("sink", new 
BoltFileSink(outputFilePath)).shuffleGrouping("counter");
+
+Config conf = new Config();
+if(runLocal) { // submit to test cluster
+       // replaces: LocalCluster cluster = new LocalCluster();
+       FlinkLocalCluster cluster = new FlinkLocalCluster();
+       cluster.submitTopology("WordCount", conf, 
FlinkTopology.createTopology(builder));
+} else { // submit to remote cluster
+       // optional
+       // conf.put(Config.NIMBUS_HOST, "remoteHost");
+       // conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
+       // replaces: StormSubmitter.submitTopology(topologyId, conf, 
builder.createTopology());
+       FlinkSubmitter.submitTopology("WordCount", conf, 
FlinkTopology.createTopology(builder));
+}
+~~~
+</div>
+</div>
+
+# Embed Storm Operators in Flink Streaming Programs
+
+As an alternative, Spouts and Bolts can be embedded into regular streaming 
programs.
+The Storm compatibility layer offers a wrapper classes for each, namely 
`SpoutWrapper` and `BoltWrapper` (`org.apache.flink.storm.wrappers`).
+
+Per default, both wrappers convert Storm output tuples to Flink's 
[Tuple](programming_guide.html#tuples-and-case-classes) types (ie, `Tuple0` to 
`Tuple25` according to the number of fields of the Storm tuples).
+For single field output tuples a conversion to the field's data type is also 
possible (eg, `String` instead of `Tuple1<String>`).
+
+Because Flink cannot infer the output field types of Storm operators, it is 
required to specify the output type manually.
+In order to get the correct `TypeInformation` object, Flink's `TypeExtractor` 
can be used.
+
+## Embed Spouts
+
+In order to use a Spout as Flink source, use 
`StreamExecutionEnvironment.addSource(SourceFunction, TypeInformation)`.
+The Spout object is handed to the constructor of `SpoutWrapper<OUT>` that 
serves as first argument to `addSource(...)`.
+The generic type declaration `OUT` specifies the type of the source output 
stream.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// stream has `raw` type (single field output streams only)
+DataStream<String> rawInput = env.addSource(
+       new SpoutWrapper<String>(new FileSpout(localFilePath), new String[] { 
Utils.DEFAULT_STREAM_ID }), // emit default output stream as raw type
+       TypeExtractor.getForClass(String.class)); // output type
+
+// process data stream
+[...]
+~~~
+</div>
+</div>
+
+If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures 
to terminate automatically by setting `numberOfInvocations` parameter in its 
constructor.
+This allows the Flink program to shut down automatically after all data is 
processed.
+Per default the program will run until it is [canceled](cli.html) manually.
+
+
+## Embed Bolts
+
+In order to use a Bolt as Flink operator, use `DataStream.transform(String, 
TypeInformation, OneInputStreamOperator)`.
+The Bolt object is handed to the constructor of `BoltWrapper<IN,OUT>` that 
serves as last argument to `transform(...)`.
+The generic type declarations `IN` and `OUT` specify the type of the 
operator's input and output stream, respectively.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+DataStream<String> text = env.readTextFile(localFilePath);
+
+DataStream<Tuple2<String, Integer>> counts = text.transform(
+       "tokenizer", // operator name
+       TypeExtractor.getForObject(new Tuple2<String, Integer>("", 0)), // 
output type
+       new BoltWrapper<String, Tuple2<String, Integer>>(new BoltTokenizer())); 
// Bolt operator
+
+// do further processing
+[...]
+~~~
+</div>
+</div>
+
+### Named Attribute Access for Embedded Bolts
+
+Bolts can accesses input tuple fields via name (additionally to access via 
index).
+To use this feature with embedded Bolts, you need to have either a
+
+ 1. [POJO](programming_guide.html#pojos) type input stream or
+ 2. [Tuple](programming_guide.html#tuples-and-case-classes) type input stream 
and spedify the input schema (ie, name-to-index-mapping)
+
+For POJO input types, Flink accesses the fields via reflection.
+For this case, Flink expects either a corresponding public member variable or 
public getter method.
+For example, if a Bolt accesses a field via name `sentence` (eg, `String s = 
input.getStringByField("sentence");`), the input POJO class must have a member 
variable `public String sentence;` or method `public String getSentence() { ... 
};` (pay attention to camel-case naming).
+
+For `Tuple` input types, it is required to specify the input schema using 
Storm's `Fields` class.
+For this case, the constructor of `BoltWrapper` takes an additional argument: 
`new BoltWrapper<Tuple1<String>, ...>(..., new Fields("sentence"))`.
+The input type is `Tuple1<String>` and `Fields("sentence")` specify that 
`input.getStringByField("sentence")` is equivalent to `input.getString(0)`.
+
+See 
[BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountPojo.java)
 and 
[BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/wordcount/BoltTokenizerWordCountWithNames.java)
 for examples.  
+
+## Configuring Spouts and Bolts
+
+In Storm, Spouts and Bolts can be configured with a globally distributed `Map` 
object that is given to `submitTopology(...)` method of `LocalCluster` or 
`StormSubmitter`.
+This `Map` is provided by the user next to the topology and gets forwarded as 
a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., 
there is no special attention required &ndash; it works as in regular Storm.
+
+For embedded usage, Flink's configuration mechanism must be used.
+A global configuration can be set in a `StreamExecutionEnvironment` via 
`.getConfig().setGlobalJobParameters(...)`.
+Flink's regular `Configuration` class can be used to configure Spouts and 
Bolts.
+However, `Configuration` does not support arbitrary key data types as Storm 
does (only `String` keys are allowed).
+Thus, Flink additionally provides `StormConfig` class that can be used like a 
raw `Map` to provide full compatibility to Storm.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+StormConfig config = new StormConfig();
+// set config values
+[...]
+
+// set global Storm configuration
+env.getConfig().setGlobalJobParameters(config);
+
+// assemble program with embedded Spouts and/or Bolts
+[...]
+~~~
+</div>
+</div>
+
+## Multiple Output Streams
+
+Flink can also handle the declaration of multiple output streams for Spouts 
and Bolts.
+If a whole topology is executed in Flink using `FlinkTopologyBuilder` etc., 
there is no special attention required &ndash; it works as in regular Storm.
+
+For embedded usage, the output stream will be of data type 
`SplitStreamType<T>` and must be split by using `DataStream.split(...)` and 
`SplitStream.select(...)`.
+Flink provides the predefined output selector `StormStreamSelector<T>` for 
`.split(...)` already.
+Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using 
`SplitStreamMapper<T>`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+[...]
+
+// get DataStream from Spout or Bolt which declares two output streams s1 and 
s2 with output type SomeType
+DataStream<SplitStreamType<SomeType>> multiStream = ...
+
+SplitStream<SplitStreamType<SomeType>> splitStream = multiStream.split(new 
StormStreamSelector<SomeType>());
+
+// remove SplitStreamType using SplitStreamMapper to get data stream of type 
SomeType
+DataStream<SomeType> s1 = splitStream.select("s1").map(new 
SplitStreamMapper<SomeType>()).returns(SomeType.classs);
+DataStream<SomeType> s2 = splitStream.select("s2").map(new 
SplitStreamMapper<SomeType>()).returns(SomeType.classs);
+
+// do further processing on s1 and s2
+[...]
+~~~
+</div>
+</div>
+
+See 
[SpoutSplitExample.java](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/src/main/java/org/apache/flink/storm/split/SpoutSplitExample.java)
 for a full example.
+
+# Flink Extensions
+
+## Finite Spouts
+
+In Flink, streaming sources can be finite, ie, emit a finite number of records 
and stop after emitting the last record. However, Spouts usually emit infinite 
streams.
+The bridge between the two approaches is the `FiniteSpout` interface which, in 
addition to `IRichSpout`, contains a `reachedEnd()` method, where the user can 
specify a stopping-condition.
+The user can create a finite Spout by implementing this interface instead of 
(or additionally to) `IRichSpout`, and implementing the `reachedEnd()` method 
in addition.
+In contrast to a `SpoutWrapper` that is configured to emit a finite number of 
tuples, `FiniteSpout` interface allows to implement more complex termination 
criteria.
+
+Although finite Spouts are not necessary to embed Spouts into a Flink 
streaming program or to submit a whole Storm topology to Flink, there are cases 
where they may come in handy:
+
+ * to achieve that a native Spout behaves the same way as a finite Flink 
source with minimal modifications
+ * the user wants to process a stream only for some time; after that, the 
Spout can stop automatically
+ * reading a file into a stream
+ * for testing purposes
+
+An example of a finite Spout that emits records for 10 seconds only:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+~~~java
+public class TimedFiniteSpout extends BaseRichSpout implements FiniteSpout {
+       [...] // implemente open(), nextTuple(), ...
+
+       private long starttime = System.currentTimeMillis();
+
+       public boolean reachedEnd() {
+               return System.currentTimeMillis() - starttime > 10000l;
+       }
+}
+~~~
+</div>
+</div>
+
+# Storm Compatibility Examples
+
+You can find more examples in Maven module `flink-storm-examples`.
+For the different versions of WordCount, see 
[README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples/README.md).
+To run the examples, you need to assemble a correct jar file.
+`flink-storm-examples-{{ site.version }}.jar` is **no** valid jar file for job 
execution (it is only a standard maven artifact).
+
+There are example jars for embedded Spout and Bolt, namely 
`WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
+Compare `pom.xml` to see how both jars are built.
+Furthermore, there is one example for whole Storm topologies 
(`WordCount-StormTopology.jar`).
+
+You can run each of those examples via `bin/flink run <jarname>.jar`. The 
correct entry point class is contained in each jar's manifest file.

Reply via email to