[ https://issues.apache.org/jira/browse/FLINK-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15185103#comment-15185103 ]
ASF GitHub Bot commented on FLINK-3591: --------------------------------------- Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/1774#discussion_r55376606 --- Diff: docs/quickstart/run_example_quickstart.md --- @@ -27,116 +27,360 @@ under the License. * This will be replaced by the TOC {:toc} -This guide walks you through the steps of executing an example program ([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on Flink. -On the way, you will see the a visualization of the program, the optimized execution plan, and track the progress of its execution. +In this guide we will start from scratch and fo from setting up a Flink project and running +a streaming analysis program on a Flink cluster. + +Wikipedia provides an IRC channel where all edits to the wiki are logged. We are going to +read this channel in Flink and count the number of bytes that each user edits within +a given window of time. This is easy enough to implement in a few minutes using Flink but it will +give you a good foundation from which to start building more complex analysis programs on your own. + +## Setting up a Maven Project + +We are going to use a Flink Maven Archetype for creating our project stucture. Please +see [Java API Quickstart]({{ site.baseurl }}/quickstart/java_api_quickstart.html) for more details +about this. For our purposes, the command to run is this: + +{% highlight bash %} +$ mvn archetype:generate\ + -DarchetypeGroupId=org.apache.flink\ + -DarchetypeArtifactId=flink-quickstart-java\ + -DarchetypeVersion=1.0.0\ + -DgroupId=wiki-edits\ + -DartifactId=wiki-edits\ + -Dversion=0.1\ + -Dpackage=wikiedits\ + -DinteractiveMode=false\ +{% endhighlight %} + +You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters, +maven will create a project structure that looks like this: + +{% highlight bash %} +$ tree wiki-edits +wiki-edits/ +├── pom.xml +└── src + └── main + ├── java + │ └── wikiedits + │ ├── Job.java + │ ├── SocketTextStreamWordCount.java + │ └── WordCount.java + └── resources + └── log4j.properties +{% endhighlight %} + +There is our `pom.xml` file that already has the Flink dependencies added in the root directory and +several example Flink programs in `src/main/java`. We can delete the example programs, since +we are going to start from scratch: + +{% highlight bash %} +$ rm wiki-edits/src/main/java/wikiedits/*.java +{% endhighlight %} + +As a last step we need to add the Flink wikipedia connector as a dependency so that we can +use it in our program. Edit the `dependencies` section so that it looks like this: + +{% highlight xml %} +<dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_2.10</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients_2.10</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-wikiedits_2.10</artifactId> + <version>${flink.version}</version> + </dependency> +</dependencies> +{% endhighlight %} + +Notice the `flink-connector-wikiedits_2.10` dependency that was added. + +## Writing a Flink Program + +It's coding time. Fire up your favorite IDE and import the Maven project or open a text editor and +create the file `src/main/wikiedits/WikipediaAnalysis.java`: + +{% highlight java %} +package wikiedits; + +public class WikipediaAnalysis { + + public static void main(String[] args) throws Exception { + + } +} +{% endhighlight %} + +I admit it's very bare bones now but we will fill it as we go. Note, that I'll not give +import statements here since IDEs can add them automatically. At the end of this section I'll show +the complete code with import statements if you simply want to skip ahead and enter that in your +editor. + +The first step in a Flink program is to create a `StreamExecutionEnvironment` +(or `ExecutionEnvironment` if you are writing a batch job). This can be used to set execution +parameters and create sources for reading from external systems. So let's go ahead, add +this to the main method: + +{% highlight java %} +StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); +{% endhighlight %} + +Next, we will create a source that reads from the Wikipedia IRC log: + +{% highlight java %} +DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource()); +{% endhighlight %} + +This creates a `DataStream` of `WikipediaEditEvent` elements that we can further process. For +the purposes of this example we are interrested in determining the number of added or removed +bytes that each user causes in a certain time window, let's say five seconds. For this we first +have to specify that we want to key the stream on the user name, that is to say that operations +on this should take the key into account. In our case the summation of edited bytes in the windows +should be per unique user. For keying a Stream we have to provide a `KeySelector`, like this: + +{% highlight java %} +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits + .keyBy(new KeySelector<WikipediaEditEvent, String>() { + @Override + public String getKey(WikipediaEditEvent event) { + return event.getUser(); + } + }); +{% endhighlight %} + +This gives us the same Stream of `WikipediaEditEvent` that has a `String` key that is the user. +We can now specify that we want to have windows imposed on this stream and compute some +result based on elements in these windows. We need to specify a window here because we are +dealing with an infinite stream of events. If you want to compute an aggregation on such an +infinite stream you never know when you are finished. That's where windows come into play, +they specify a time slice in which we should perform our computation. In our example we will say +that we want to aggregate the sum of edited bytes for every five seconds: + +{% highlight java %} +DataStream<Tuple2<String, Long>> result = keyedEdits + .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { + @Override + public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { + acc.f0 = event.getUser(); + acc.f1 += event.getByteDiff(); + return acc; + } + }); +{% endhighlight %} + +The first call, `.window()`, specified that we want to have tumbling (non-overlapping) windows +of five seconds. The second call specifies a *Fold transformation* on each window slice for +each unique key. In our case we start from an initial value of `("", 0L)` and add to it the byte +difference of every edit in that time window for a user. The resulting Stream now contains +a `Tuple2<String, Long>` for every user which gets emitted every five seconds. + +The only thing left to do is print the stream to the console and start execution: + +{% highlight java %} +result.print(); + +see.execute(); +{% endhighlight %} + +That last call is necessary to start the actual Flink job. All operations, such as creating +sources, transformations and sinks only build up a graph of internal operations. Only when +`execute()` is called is this graph of operations thrown on a cluster or executed on your local +machine. + +The complete code so far is this: + +{% highlight java %} +package wikiedits; + +import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent; +import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource; + +public class WikipediaAnalysis { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<WikipediaEditEvent> edits = see.addSource(new WikipediaEditsSource()); + + KeyedStream<WikipediaEditEvent, String> keyedEdits = edits + .keyBy(new KeySelector<WikipediaEditEvent, String>() { + @Override + public String getKey(WikipediaEditEvent event) { + return event.getUser(); + } + }); + + DataStream<Tuple2<String, Long>> result = keyedEdits + .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, Tuple2<String, Long>>() { + @Override + public Tuple2<String, Long> fold(Tuple2<String, Long> acc, WikipediaEditEvent event) { + acc.f0 = event.getUser(); + acc.f1 += event.getByteDiff(); + return acc; + } + }); + + result.print(); + + see.execute(); + } +} +{% endhighlight %} + +You can run this example in your IDE or on the commandline, using Maven: + +{% highlight bash %} +$ mvn clean package +$ mvn exec:java -Dexec.mainClass=wikiedits.WikipediaAnalysis +{% endhighlight %} + +The first command build our project and the second executes our main class. The output should be +similar to this: + +{% highlight bash %} +1> (Fenix down,114) +6> (AnomieBOT,155) +8> (BD2412bot,-3690) +7> (IgnorantArmies,49) +3> (Ckh3111,69) +5> (Slade360,0) +7> (Narutolovehinata5,2195) +6> (Vuyisa2001,79) +4> (Ms Sarah Welch,269) +4> (KasparBot,-245) +{% endhighlight %} + +The number in front of each line tells you on which parallel instance of the print sink the output +was produced. + +This should get you started with writing your own Flink programs. You can check out our guides +about [basic concepts]{{{ site.baseurl }}/apis/common/index.html} and the +[DataStream API]{{{ site.baseurl }}/apis/streaming/index.html} if you want to learn more. Stick +around for the bonus exercise if you want to learn about setting up a Flink cluster on +your own machine and writing results to [Kafka](http://kafka.apache.org). + +## Bonus Exercise: Running on a Cluster and Writing to Kafka + +Please follow our [setup quickstart](setup_quickstart.html) for setting up a Flink distribution +on your machine and refer to the [Kafka quickstart](https://kafka.apache.org/documentation.html#quickstart) +for setting up a Kafka installation before we proceed. + +As a first step, we have to add the Flink Kafka connector as a dependency so that we can +use the Kafka sink. Add this to the `pom.xml` file in the dependencies section: + +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kafka-0.8_2.10</artifactId> + <version>${flink.version}</version> +</dependency> +{% endhighlight %} + +Next, we need to modify our program. We'll remove the `print()` sink and instead use a +Kafka sink. The new code looks like this: + +{% highlight java %} + +result + .map(new MapFunction<Tuple2<String,Long>, String>() { + @Override + public String map(Tuple2<String, Long> tuple) { + return tuple.toString(); + } + }) + .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new SimpleStringSchema())); +{% endhighlight %} + +Note how we first transform the Stream of `Tuple2<String, Long>` to a Stream of `String` using +a MapFunction. We are doing this because it is easier to write plain strings to Kafka. Then, +we create a Kafka sink. You might have to adapt the hostname and port to your setup, `"wiki-result"` +is the name of the Kafka stream that we are going to create next, before running our program. +Build the project using Maven because we need the jar file for running on the cluster: + +{% highlight bash %} +$ mvn clean package +{% endhighlight %} + +The resulting jar file will be in the `target` subfolder: `target/wiki-edits-0.1.jar`. We'll use +this later. + +Now we are ready to launch a Flink cluster and run the program that writes to Kafka on it. Go +to the location where you installed Flink and start a local cluster: + +{% highlight bash %} +$ cd my/flink/directory +$ bin/start-local.sh +{% endhighlight %} + +We also have to create the Kafka Topic, so that we our program can write to it: + +{% highlight bash %} +$ cd my/kafka/directory +$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic wiki-results +{% endhighlight %} + +Now we are ready to run our jar file on the local Flink cluster: +{% highlight bash %} +$ cd my/flink/directory +$ bin/flink run -c wikiedits.WikipediaAnalysis path/to/wikiedits-0.1.jar +{% endhighlight %} + +The output of that command should look similar to this, if everything went according to plan: + +``` +03/08/2016 15:09:27 Job execution switched to status RUNNING. +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to SCHEDULED +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to DEPLOYING +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to SCHEDULED +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to DEPLOYING +03/08/2016 15:09:27 TriggerWindow(TumblingProcessingTimeWindows(5000), FoldingStateDescriptor{name=window-contents, defaultValue=(,0), serializer=null}, ProcessingTimeTrigger(), WindowedStream.fold(WindowedStream.java:207)) -> Map -> Sink: Unnamed(1/1) switched to RUNNING +03/08/2016 15:09:27 Source: Custom Source(1/1) switched to RUNNING +``` + +You can see how the individual operators start running. There are only two because +the operations after the window get folded into one operation for performance reasons. In Flink +we call this *chaining*. + +You can observe the output of the program by inspecting the Kafka topic using the Kafka +console consumer: + +{% highlight bash %} +bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wiki-result +{% endhighlight %} + +You can also check out the Flink dashboard which should be running at [http://localhost:8081](http://localhost:8081). +You get an overview of your cluster ressources and running jobs: + +<a href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-overview.png" ><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-overview.png" alt="JobManager Overview"/></a> -## Setup Flink -Follow the [instructions](setup_quickstart.html) to setup Flink and enter the root directory of your Flink setup. - -## Generate Input Data -Flink contains a data generator for K-Means. - -~~~bash -# Assuming you are in the root directory of your Flink setup -mkdir kmeans -cd kmeans -# Run data generator -java -cp ../examples/batch/KMeans.jar:../lib/flink-dist-{{ site.version }}.jar \ - org.apache.flink.examples.java.clustering.util.KMeansDataGenerator \ - -points 500 -k 10 -stddev 0.08 -output `pwd` -~~~ - -The generator has the following arguments (arguments in `[]` are optional): - -~~~bash --points <num> -k <num clusters> [-output <output-path>] [-stddev <relative stddev>] [-range <centroid range>] [-seed <seed>] -~~~ - -The _relative standard deviation_ is an interesting tuning parameter. It determines the closeness of the points to randomly generated centers. - -The `kmeans/` directory should now contain two files: `centers` and `points`. The `points` file contains the points to cluster and the `centers` file contains initial cluster centers. - - -## Inspect the Input Data -Use the `plotPoints.py` tool to review the generated data points. [Download Python Script](plotPoints.py) - -~~~ bash -python plotPoints.py points ./points input -~~~ - -Note: You might have to install [matplotlib](http://matplotlib.org/) (`python-matplotlib` package on Ubuntu) to use the Python script. - -You can review the input data stored in the `input-plot.pdf`, for example with Evince (`evince input-plot.pdf`). - -The following overview presents the impact of the different standard deviations on the input data. - -|relative stddev = 0.03|relative stddev = 0.08|relative stddev = 0.15| -|:--------------------:|:--------------------:|:--------------------:| -|<img src="{{ site.baseurl }}/page/img/quickstart-example/kmeans003.png" alt="example1" style="width: 275px;"/>|<img src="{{ site.baseurl }}/page/img/quickstart-example/kmeans008.png" alt="example2" style="width: 275px;"/>|<img src="{{ site.baseurl }}/page/img/quickstart-example/kmeans015.png" alt="example3" style="width: 275px;"/>| - - -## Start Flink -Start Flink and the web job submission client on your local machine. - -~~~ bash -# return to the Flink root directory -cd .. -# start Flink -./bin/start-local.sh -~~~ - -## Inspect and Run the K-Means Example Program -The Flink web interface allows to submit Flink programs using a graphical user interface. - -<div class="row" style="padding-top:15px"> - <div class="col-md-6"> - <a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_submit.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_submit.png" /></a> - </div> - <div class="col-md-6"> - 1. Open web interface on <a href="http://localhost:8081">localhost:8081</a> <br> - 2. Select the "Submit new Job" page in the menu <br> - 3. Upload the <code>KMeans.jar</code> from <code>examples/batch</code> by clicking the "Add New" button, and then the "Upload" button. <br> - 4. Select the <code>KMeans.jar</code> form the list of jobs <br> - 5. Enter the arguments and options in the lower box: <br> - Leave the <i>Entry Class</i> and <i>Parallelism</i> form empty<br> - Enter the following program arguments: <br> - (KMeans expects the following args: <code>--points <path> --centroids <path> --output <path> --iterations <n></code> - {% highlight bash %}--points /tmp/kmeans/points --centroids /tmp/kmeans/centers --output /tmp/kmeans/result --iterations 10{% endhighlight %}<br> - 6. Press <b>Submit</b> to start the job - </div> -</div> -<hr> -<div class="row" style="padding-top:15px"> - <div class="col-md-6"> - <a data-lightbox="compiler" href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_execute.png" data-lightbox="example-1"><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager_kmeans_execute.png" /></a> - </div> - - <div class="col-md-6"> - Watch the job executing. - </div> -</div> - - -## Shutdown Flink -Stop Flink when you are done. - -~~~ bash -# stop Flink -./bin/stop-local.sh -~~~ - -## Analyze the Result -Use the [Python Script](plotPoints.py) again to visualize the result. - -~~~bash -cd kmeans -python plotPoints.py result ./result clusters -~~~ - -The following three pictures show the results for the sample input above. Play around with the parameters (number of iterations, number of clusters) to see how they affect the result. - - -|relative stddev = 0.03|relative stddev = 0.08|relative stddev = 0.15| -|:--------------------:|:--------------------:|:--------------------:| -|<img src="{{ site.baseurl }}/page/img/quickstart-example/result003.png" alt="example1" style="width: 275px;"/>|<img src="{{ site.baseurl }}/page/img/quickstart-example/result008.png" alt="example2" style="width: 275px;"/>|<img src="{{ site.baseurl }}/page/img/quickstart-example/result015.png" alt="example3" style="width: 275px;"/>| +If you click on your running job you will get a view where you can inspect individual operations +and, for example, see the number of processed elements: + +<a href="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" ><img class="img-responsive" src="{{ site.baseurl }}/page/img/quickstart-example/jobmanager-job.png" alt="Example Job View"/></a> +This concludes our little tour of Flink. If you have any questions, please don't hesitate to ask on our [Mailing Lists](http://mail-archives.apache.org/mod_mbox/flink-user/). --- End diff -- Let's link to the community mailing lists page so people can sign up easily? > Replace Quickstart K-Means Example by Streaming Example > ------------------------------------------------------- > > Key: FLINK-3591 > URL: https://issues.apache.org/jira/browse/FLINK-3591 > Project: Flink > Issue Type: Improvement > Components: Documentation > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > -- This message was sent by Atlassian JIRA (v6.3.4#6332)