    Diff: docs/quickstart/run_example_quickstart.md
    
     * This will be replaced by the TOC
    In this guide we will start from scratch and fo from setting up a Flink project and running 
([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on 
    -On the way, you will see the a visualization of the program, the optimized 
execution plan, and track the progress of its execution.
    In this guide we will start from scratch and fo from setting up a Flink project and running 
project and running
    +a streaming analysis program on a Flink cluster.
    Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to 
We are going to
    read this channel in Flink and count the number of bytes that each user edits within 
edits within
    a given window of time. This is easy enough to implement in a few minutes using Flink but it will 
using Flink but it will
    give you a good foundation from which to start building more complex analysis programs on your own. 
analysis programs on your own.
    ## Setting up a Maven Project
    We are going to use a Flink Maven Archetype for creating our project stucture. Please 
stucture. Please
    +see [Java API Quickstart]({{ site.baseurl 
}}/quickstart/java_api_quickstart.html) for more details
    about this. For our purposes, the command to run is this:
    {% highlight bash %}
    $ mvn archetype:generate\
    -DarchetypeGroupId=org.apache.flink\
    -DarchetypeArtifactId=flink-quickstart-java\
    -DarchetypeVersion=1.0.0\
    -DgroupId=wiki-edits\
    -DartifactId=wiki-edits\
    -Dversion=0.1\
    -Dpackage=wikiedits\
    -DinteractiveMode=false\
    {% endhighlight %}
    You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, 
the above parameters,
    maven will create a project structure that looks like this:
    {% highlight bash %}
    $ tree wiki-edits
    ├── pom.xml
    └── src
    └── main
    ├── java
    │   └── wikiedits
    │       ├── Job.java
    │       ├── SocketTextStreamWordCount.java
    │       └── WordCount.java
    └── resources
    └── log4j.properties
    {% endhighlight %}
    There is our `pom.xml` file that already has the Flink dependencies added in the root directory and 
in the root directory and
    several example Flink programs in `src/main/java`. We can delete the example programs, since 
example programs, since
    we are going to start from scratch:
    {% highlight bash %}
    $ rm wiki-edits/src/main/java/wikiedits/*.java
    {% endhighlight %}
    As a last step we need to add the Flink wikipedia connector as a dependency so that we can 
dependency so that we can
    use it in our program. Edit the `dependencies` section so that it looks like this: 
like this:
    {% highlight xml %}
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-wikiedits_2.10</artifactId>
    <version>${flink.version}</version>
    </dependency>
    {% endhighlight %}
    Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    ## Writing a Flink Program
    It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and 
or open a text editor and
    create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    {% highlight java %}
    package wikiedits;
    public class WikipediaAnalysis {
    public static void main(String[] args) throws Exception {
    }
    {% endhighlight %}
    I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give 
I'll not give
    import statements here since IDEs can add them automatically. At the end of this section I'll show 
of this section I'll show
    the complete code with import statements if you simply want to skip ahead and enter that in your 
and enter that in your
    The first step in a Flink program is to create a 
    (or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution 
used to set execution
    parameters and create sources for reading from external systems. So let's go ahead, add 
go ahead, add
    this to the main method:
    {% highlight java %}
    StreamExecutionEnvironment see = 
    {% endhighlight %}
    Next, we will create a source that reads from the Wikipedia IRC log:
    {% highlight java %}
    DataStream<WikipediaEditEvent> edits = see.addSource(new 
    {% endhighlight %}
    This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For 
further process. For
    the purposes of this example we are interrested in determining the number of added or removed 
of added or removed
    bytes that each user causes in a certain time window, let's say five seconds. For this we first 
seconds. For this we first
    have to specify that we want to key the stream on the user name, that is to say that operations 
to say that operations
    on this should take the key into account. In our case the summation of edited bytes in the windows 
edited bytes in the windows
    should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this: 
`KeySelector`, like this:
    {% highlight java %}
    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    @Override
    public String getKey(WikipediaEditEvent event) {
    return event.getUser();
    }
    });
    {% endhighlight %}
    This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user. 
key that is the user.
    We can now specify that we want to have windows imposed on this stream and compute some 
compute some
    result based on elements in these windows. We need to specify a window here because we are 
here because we are
    dealing with an infinite stream of events. If you want to compute an aggregation on such an 
aggregation on such an
    infinite stream you never know when you are finished. That's where windows come into play, 
come into play,
    they specify a time slice in which we should perform our computation. In our example we will say 
our example we will say
    that we want to aggregate the sum of edited bytes for every five seconds:
    {% highlight java %}
    DataStream<Tuple2<String, Long>> result = keyedEdits
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, 
Tuple2<String, Long>>() {
    @Override
    public Tuple2<String, Long> fold(Tuple2<String, Long> acc, 
WikipediaEditEvent event) {
    acc.f0 = event.getUser();
    acc.f1 += event.getByteDiff();
    return acc;
    }
    });
    {% endhighlight %}
    The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows 
(non-overlapping) windows
    of five seconds. The second call specifies a *Fold transformation* on each window slice for 
window slice for
    each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte 
and add to it the byte
    difference of every edit in that time window for a user. The resulting Stream now contains 
Stream now contains
    a `Tuple2<String, Long>` for every user which gets emitted every five 
    The only thing left to do is print the stream to the console and start 
    {% highlight java %}
    {% endhighlight %}
    That last call is necessary to start the actual Flink job. All operations, such as creating 
such as creating
    sources, transformations and sinks only build up a graph of internal operations. Only when 
operations. Only when
    `execute()` is called is this graph of operations thrown on a cluster or executed on your local 
executed on your local
    The complete code so far is this:
    {% highlight java %}
    package wikiedits;
    import org.apache.flink.api.common.functions.FoldFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    public class WikipediaAnalysis {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment see = 
    DataStream<WikipediaEditEvent> edits = see.addSource(new 
    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    @Override
    public String getKey(WikipediaEditEvent event) {
    return event.getUser();
    }
    });
    DataStream<Tuple2<String, Long>> result = keyedEdits
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, 
Tuple2<String, Long>>() {
    @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, 
WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +    result.print();
    +    see.execute();
    +  }
    +{% endhighlight %}
    +You can run this example in your IDE or on the commandline, using Maven:
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +The first command build our project and the second executes our main 
class. The output should be
    +similar to this:
    +{% highlight bash %}
    +1> (Fenix down,114)
    +6> (AnomieBOT,155)
    +8> (BD2412bot,-3690)
    +7> (IgnorantArmies,49)
    +3> (Ckh3111,69)
    +5> (Slade360,0)
    +7> (Narutolovehinata5,2195)
    +6> (Vuyisa2001,79)
    +4> (Ms Sarah Welch,269)
    +4> (KasparBot,-245)
    +{% endhighlight %}
    +The number in front of each line tells you on which parallel instance of 
the print sink the output
    +was produced.
    +This should get you started with writing your own Flink programs. You can 
check out our guides
    +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
    +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want 
to learn more. Stick
    +around for the bonus exercise if you want to learn about setting up a 
Flink cluster on
    +your own machine and writing results to [Kafka](http://kafka.apache.org).
    +## Bonus Exercise: Running on a Cluster and Writing to Kafka
    +Please follow our [setup quickstart](setup_quickstart.html) for setting up 
a Flink distribution
    +on your machine and refer to the [Kafka 
    +for setting up a Kafka installation before we proceed.
    +As a first step, we have to add the Flink Kafka connector as a dependency 
so that we can
    +use the Kafka sink. Add this to the `pom.xml` file in the dependencies 
    +{% highlight xml %}
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    +    <version>${flink.version}</version>
    +{% endhighlight %}
    +Next, we need to modify our program. We'll remove the `print()` sink and 
instead use a
    +Kafka sink. The new code looks like this:
    +{% highlight java %}
    +    .map(new MapFunction<Tuple2<String,Long>, String>() {
    +        @Override
    +        public String map(Tuple2<String, Long> tuple) {
    +            return tuple.toString();
    +        }
    +    })
    +    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", 
new SimpleStringSchema()));
    +{% endhighlight %}
    +Note how we first transform the Stream of `Tuple2<String, Long>` to a 
Stream of `String` using
    +a MapFunction. We are doing this because it is easier to write plain 
strings to Kafka. Then,
    +we create a Kafka sink. You might have to adapt the hostname and port to 
your setup, `"wiki-result"`
    +is the name of the Kafka stream that we are going to create next, before 
running our program.
    +Build the project using Maven because we need the jar file for running on 
the cluster:
    +{% highlight bash %}
    +$ mvn clean package
    +{% endhighlight %}
    +The resulting jar file will be in the `target` subfolder: 
`target/wiki-edits-0.1.jar`. We'll use
    +this later.
    +Now we are ready to launch a Flink cluster and run the program that writes 
to Kafka on it. Go
    +to the location where you installed Flink and start a local cluster:
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/start-local.sh
    +{% endhighlight %}
    +We also have to create the Kafka Topic, so that we our program can write 
to it:
    +{% highlight bash %}
    +$ cd my/kafka/directory
    +$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic 
    +{% endhighlight %}
    +Now we are ready to run our jar file on the local Flink cluster:
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
    +{% endhighlight %}
    +The output of that command should look similar to this, if everything went 
according to plan:
    +03/08/2016 15:09:27 Job execution switched to status RUNNING.
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to SCHEDULED
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to RUNNING
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
    +You can see how the individual operators start running. There are only two 
    +the operations after the window get folded into one operation for 
performance reasons. In Flink
    +we call this *chaining*.
    +You can observe the output of the program by inspecting the Kafka topic 
using the Kafka
    +console consumer:
    +{% highlight bash %}
    +bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic 
    +{% endhighlight %}
    +You can also check out the Flink dashboard which should be running at 
    +You get an overview of your cluster ressources and running jobs:
    +<a href="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager-overview.png" ><img 
class="img-responsive" src="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager-overview.png" alt="JobManager 
    +<a href="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager-job.png" ><img class="img-responsive" 
src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" 
alt="Example Job View"/></a>
    +This concludes our little tour of Flink. If you have any questions, please 
don't hesitate to ask on our [Mailing 
    --- End diff --
    Let's link to the community mailing lists page so people can sign up easily?

