[ 
https://issues.apache.org/jira/browse/FLINK-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15185103#comment-15185103
 ] 

ASF GitHub Bot commented on FLINK-3591:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55376606
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program 
([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on 
Flink. 
    -On the way, you will see the a visualization of the program, the optimized 
execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink 
project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. 
We are going to
    +read this channel in Flink and count the number of bytes that each user 
edits within
    +a given window of time. This is easy enough to implement in a few minutes 
using Flink but it will
    +give you a good foundation from which to start building more complex 
analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project 
stucture. Please
    +see [Java API Quickstart]({{ site.baseurl 
}}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With 
the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added 
in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the 
example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a 
dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks 
like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project 
or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that 
I'll not give
    +import statements here since IDEs can add them automatically. At the end 
of this section I'll show
    +the complete code with import statements if you simply want to skip ahead 
and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a 
`StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be 
used to set execution
    +parameters and create sources for reading from external systems. So let's 
go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new 
WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can 
further process. For
    +the purposes of this example we are interrested in determining the number 
of added or removed
    +bytes that each user causes in a certain time window, let's say five 
seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is 
to say that operations
    +on this should take the key into account. In our case the summation of 
edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a 
`KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` 
key that is the user.
    +We can now specify that we want to have windows imposed on this stream and 
compute some
    +result based on elements in these windows. We need to specify a window 
here because we are
    +dealing with an infinite stream of events. If you want to compute an 
aggregation on such an
    +infinite stream you never know when you are finished. That's where windows 
come into play,
    +they specify a time slice in which we should perform our computation. In 
our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, 
Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, 
WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling 
(non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each 
window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` 
and add to it the byte
    +difference of every edit in that time window for a user. The resulting 
Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five 
seconds.
    +
    +The only thing left to do is print the stream to the console and start 
execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, 
such as creating
    +sources, transformations and sinks only build up a graph of internal 
operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or 
executed on your local
    +machine.
    +
    +The complete code so far is this:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +import org.apache.flink.api.common.functions.FoldFunction;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.KeyedStream;
    +import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    +import org.apache.flink.streaming.api.windowing.time.Time;
    +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    +import 
org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    +
    +public class WikipediaAnalysis {
    +
    +  public static void main(String[] args) throws Exception {
    +
    +    StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +    DataStream<WikipediaEditEvent> edits = see.addSource(new 
WikipediaEditsSource());
    +
    +    KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +      .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +          return event.getUser();
    +        }
    +      });
    +
    +    DataStream<Tuple2<String, Long>> result = keyedEdits
    +      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +      .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, 
Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, 
WikipediaEditEvent event) {
    +          acc.f0 = event.getUser();
    +          acc.f1 += event.getByteDiff();
    +          return acc;
    +        }
    +      });
    +
    +    result.print();
    +
    +    see.execute();
    +  }
    +}
    +{% endhighlight %}
    +
    +You can run this example in your IDE or on the commandline, using Maven:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis
    +{% endhighlight %}
    +
    +The first command build our project and the second executes our main 
class. The output should be
    +similar to this:
    +
    +{% highlight bash %}
    +1> (Fenix down,114)
    +6> (AnomieBOT,155)
    +8> (BD2412bot,-3690)
    +7> (IgnorantArmies,49)
    +3> (Ckh3111,69)
    +5> (Slade360,0)
    +7> (Narutolovehinata5,2195)
    +6> (Vuyisa2001,79)
    +4> (Ms Sarah Welch,269)
    +4> (KasparBot,-245)
    +{% endhighlight %}
    +
    +The number in front of each line tells you on which parallel instance of 
the print sink the output
    +was produced.
    +
    +This should get you started with writing your own Flink programs. You can 
check out our guides
    +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the
    +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want 
to learn more. Stick
    +around for the bonus exercise if you want to learn about setting up a 
Flink cluster on
    +your own machine and writing results to [Kafka](http://kafka.apache.org).
    +
    +## Bonus Exercise: Running on a Cluster and Writing to Kafka
    +
    +Please follow our [setup quickstart](setup_quickstart.html) for setting up 
a Flink distribution
    +on your machine and refer to the [Kafka 
quickstart](https://kafka.apache.org/documentation.html#quickstart)
    +for setting up a Kafka installation before we proceed.
    +
    +As a first step, we have to add the Flink Kafka connector as a dependency 
so that we can
    +use the Kafka sink. Add this to the `pom.xml` file in the dependencies 
section:
    +
    +{% highlight xml %}
    +<dependency>
    +    <groupId>org.apache.flink</groupId>
    +    <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
    +    <version>${flink.version}</version>
    +</dependency>
    +{% endhighlight %}
    +
    +Next, we need to modify our program. We'll remove the `print()` sink and 
instead use a
    +Kafka sink. The new code looks like this:
    +
    +{% highlight java %}
    +
    +result
    +    .map(new MapFunction<Tuple2<String,Long>, String>() {
    +        @Override
    +        public String map(Tuple2<String, Long> tuple) {
    +            return tuple.toString();
    +        }
    +    })
    +    .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", 
new SimpleStringSchema()));
    +{% endhighlight %}
    +
    +Note how we first transform the Stream of `Tuple2<String, Long>` to a 
Stream of `String` using
    +a MapFunction. We are doing this because it is easier to write plain 
strings to Kafka. Then,
    +we create a Kafka sink. You might have to adapt the hostname and port to 
your setup, `"wiki-result"`
    +is the name of the Kafka stream that we are going to create next, before 
running our program.
    +Build the project using Maven because we need the jar file for running on 
the cluster:
    +
    +{% highlight bash %}
    +$ mvn clean package
    +{% endhighlight %}
    +
    +The resulting jar file will be in the `target` subfolder: 
`target/wiki-edits-0.1.jar`. We'll use
    +this later.
    +
    +Now we are ready to launch a Flink cluster and run the program that writes 
to Kafka on it. Go
    +to the location where you installed Flink and start a local cluster:
    +
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/start-local.sh
    +{% endhighlight %}
    +
    +We also have to create the Kafka Topic, so that we our program can write 
to it:
    +
    +{% highlight bash %}
    +$ cd my/kafka/directory
    +$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic 
wiki-results
    +{% endhighlight %}
    +
    +Now we are ready to run our jar file on the local Flink cluster:
    +{% highlight bash %}
    +$ cd my/flink/directory
    +$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar
    +{% endhighlight %}
    +
    +The output of that command should look similar to this, if everything went 
according to plan:
    +
    +```
    +03/08/2016 15:09:27 Job execution switched to status RUNNING.
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to SCHEDULED
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to DEPLOYING
    +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), 
FoldingStateDescriptor{name=window-contents, defaultValue=(,0), 
serializer=null}, ProcessingTimeTrigger(), 
WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) 
switched to RUNNING
    +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING
    +```
    +
    +You can see how the individual operators start running. There are only two 
because
    +the operations after the window get folded into one operation for 
performance reasons. In Flink
    +we call this *chaining*.
    +
    +You can observe the output of the program by inspecting the Kafka topic 
using the Kafka
    +console consumer:
    +
    +{% highlight bash %}
    +bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic 
wiki-result
    +{% endhighlight %}
    +
    +You can also check out the Flink dashboard which should be running at 
[http://localhost:8081](http://localhost:8081).
    +You get an overview of your cluster ressources and running jobs:
    +
    +<a href="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager-overview.png" ><img 
class="img-responsive" src="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager-overview.png" alt="JobManager 
Overview"/></a>
     
    -## Setup Flink
    -Follow the [instructions](setup_quickstart.html) to setup Flink and enter 
the root directory of your Flink setup.
    -
    -## Generate Input Data
    -Flink contains a data generator for K-Means.
    -
    -~~~bash
    -# Assuming you are in the root directory of your Flink setup
    -mkdir kmeans
    -cd kmeans
    -# Run data generator
    -java -cp ../examples/batch/KMeans.jar:../lib/flink-dist-{{ site.version 
}}.jar \
    -  org.apache.flink.examples.java.clustering.util.KMeansDataGenerator \
    -  -points 500 -k 10 -stddev 0.08 -output `pwd`
    -~~~
    -
    -The generator has the following arguments (arguments in `[]` are optional):
    -
    -~~~bash
    --points <num> -k <num clusters> [-output <output-path>] [-stddev <relative 
stddev>] [-range <centroid range>] [-seed <seed>]
    -~~~
    -
    -The _relative standard deviation_ is an interesting tuning parameter. It 
determines the closeness of the points to randomly generated centers.
    -
    -The `kmeans/` directory should now contain two files: `centers` and 
`points`. The `points` file contains the points to cluster and the `centers` 
file contains initial cluster centers.
    -
    -
    -## Inspect the Input Data
    -Use the `plotPoints.py` tool to review the generated data points. 
[Download Python Script](plotPoints.py)
    -
    -~~~ bash
    -python plotPoints.py points ./points input
    -~~~ 
    -
    -Note: You might have to install [matplotlib](http://matplotlib.org/) 
(`python-matplotlib` package on Ubuntu) to use the Python script.
    -
    -You can review the input data stored in the `input-plot.pdf`, for example 
with Evince (`evince input-plot.pdf`).
    -
    -The following overview presents the impact of the different standard 
deviations on the input data.
    -
    -|relative stddev = 0.03|relative stddev = 0.08|relative stddev = 0.15|
    -|:--------------------:|:--------------------:|:--------------------:|
    -|<img src="{{ site.baseurl }}/page/img/quickstart-example/kmeans003.png" 
alt="example1" style="width: 275px;"/>|<img src="{{ site.baseurl 
}}/page/img/quickstart-example/kmeans008.png" alt="example2" style="width: 
275px;"/>|<img src="{{ site.baseurl 
}}/page/img/quickstart-example/kmeans015.png" alt="example3" style="width: 
275px;"/>|
    -
    -
    -## Start Flink
    -Start Flink and the web job submission client on your local machine.
    -
    -~~~ bash
    -# return to the Flink root directory
    -cd ..
    -# start Flink
    -./bin/start-local.sh
    -~~~
    -
    -## Inspect and Run the K-Means Example Program
    -The Flink web interface allows to submit Flink programs using a graphical 
user interface.
    -
    -<div class="row" style="padding-top:15px">
    -   <div class="col-md-6">
    -           <a data-lightbox="compiler" href="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager_kmeans_submit.png" 
data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager_kmeans_submit.png" /></a>
    -   </div>
    -   <div class="col-md-6">
    -           1. Open web interface on <a 
href="http://localhost:8081";>localhost:8081</a> <br>
    -           2. Select the "Submit new Job" page in the menu <br>
    -           3. Upload the <code>KMeans.jar</code> from 
<code>examples/batch</code> by clicking the "Add New" button, and then the 
"Upload" button. <br>
    -           4. Select the <code>KMeans.jar</code> form the list of jobs <br>
    -           5. Enter the arguments and options in the lower box: <br>
    -               Leave the <i>Entry Class</i> and <i>Parallelism</i> form 
empty<br>
    -               Enter the following program arguments: <br>
    -               (KMeans expects the following args: <code>--points 
&lt;path&gt; --centroids &lt;path&gt; --output &lt;path&gt; --iterations 
&lt;n&gt;</code>
    -                   {% highlight bash %}--points /tmp/kmeans/points 
--centroids /tmp/kmeans/centers --output /tmp/kmeans/result --iterations 10{% 
endhighlight %}<br>
    -           6. Press <b>Submit</b> to start the job
    -   </div>
    -</div>
    -<hr>
    -<div class="row" style="padding-top:15px">
    -   <div class="col-md-6">
    -           <a data-lightbox="compiler" href="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager_kmeans_execute.png" 
data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager_kmeans_execute.png" /></a>
    -   </div>
    -
    -   <div class="col-md-6">
    -           Watch the job executing.
    -   </div>
    -</div>
    -
    -
    -## Shutdown Flink
    -Stop Flink when you are done.
    -
    -~~~ bash
    -# stop Flink
    -./bin/stop-local.sh
    -~~~
    -
    -## Analyze the Result
    -Use the [Python Script](plotPoints.py) again to visualize the result.
    -
    -~~~bash
    -cd kmeans
    -python plotPoints.py result ./result clusters
    -~~~
    -
    -The following three pictures show the results for the sample input above. 
Play around with the parameters (number of iterations, number of clusters) to 
see how they affect the result.
    -
    -
    -|relative stddev = 0.03|relative stddev = 0.08|relative stddev = 0.15|
    -|:--------------------:|:--------------------:|:--------------------:|
    -|<img src="{{ site.baseurl }}/page/img/quickstart-example/result003.png" 
alt="example1" style="width: 275px;"/>|<img src="{{ site.baseurl 
}}/page/img/quickstart-example/result008.png" alt="example2" style="width: 
275px;"/>|<img src="{{ site.baseurl 
}}/page/img/quickstart-example/result015.png" alt="example3" style="width: 
275px;"/>|
    +If you click on your running job you will get a view where you can inspect 
individual operations
    +and, for example, see the number of processed elements:
    +
    +<a href="{{ site.baseurl 
}}/page/img/quickstart-example/jobmanager-job.png" ><img class="img-responsive" 
src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" 
alt="Example Job View"/></a>
     
    +This concludes our little tour of Flink. If you have any questions, please 
don't hesitate to ask on our [Mailing 
Lists](http://mail-archives.apache.org/mod_mbox/flink-user/).
    --- End diff --
    
    Let's link to the community mailing lists page so people can sign up easily?


> Replace Quickstart K-Means Example by Streaming Example
> -------------------------------------------------------
>
>                 Key: FLINK-3591
>                 URL: https://issues.apache.org/jira/browse/FLINK-3591
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to