[ 
https://issues.apache.org/jira/browse/FLINK-3591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15185051#comment-15185051
 ] 

ASF GitHub Bot commented on FLINK-3591:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1774#discussion_r55371997
  
    --- Diff: docs/quickstart/run_example_quickstart.md ---
    @@ -27,116 +27,360 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -This guide walks you through the steps of executing an example program 
([K-Means clustering](http://en.wikipedia.org/wiki/K-means_clustering)) on 
Flink. 
    -On the way, you will see the a visualization of the program, the optimized 
execution plan, and track the progress of its execution.
    +In this guide we will start from scratch and fo from setting up a Flink 
project and running
    +a streaming analysis program on a Flink cluster.
    +
    +Wikipedia provides an IRC channel where all edits to the wiki are logged. 
We are going to
    +read this channel in Flink and count the number of bytes that each user 
edits within
    +a given window of time. This is easy enough to implement in a few minutes 
using Flink but it will
    +give you a good foundation from which to start building more complex 
analysis programs on your own.
    +
    +## Setting up a Maven Project
    +
    +We are going to use a Flink Maven Archetype for creating our project 
stucture. Please
    +see [Java API Quickstart]({{ site.baseurl 
}}/quickstart/java_api_quickstart.html) for more details
    +about this. For our purposes, the command to run is this:
    +
    +{% highlight bash %}
    +$ mvn archetype:generate\
    +    -DarchetypeGroupId=org.apache.flink\
    +    -DarchetypeArtifactId=flink-quickstart-java\
    +    -DarchetypeVersion=1.0.0\
    +    -DgroupId=wiki-edits\
    +    -DartifactId=wiki-edits\
    +    -Dversion=0.1\
    +    -Dpackage=wikiedits\
    +    -DinteractiveMode=false\
    +{% endhighlight %}
    +
    +You can edit the `groupId`, `artifactId` and `package` if you like. With 
the above parameters,
    +maven will create a project structure that looks like this:
    +
    +{% highlight bash %}
    +$ tree wiki-edits
    +wiki-edits/
    +├── pom.xml
    +└── src
    +    └── main
    +        ├── java
    +        │   └── wikiedits
    +        │       ├── Job.java
    +        │       ├── SocketTextStreamWordCount.java
    +        │       └── WordCount.java
    +        └── resources
    +            └── log4j.properties
    +{% endhighlight %}
    +
    +There is our `pom.xml` file that already has the Flink dependencies added 
in the root directory and
    +several example Flink programs in `src/main/java`. We can delete the 
example programs, since
    +we are going to start from scratch:
    +
    +{% highlight bash %}
    +$ rm wiki-edits/src/main/java/wikiedits/*.java
    +{% endhighlight %}
    +
    +As a last step we need to add the Flink wikipedia connector as a 
dependency so that we can
    +use it in our program. Edit the `dependencies` section so that it looks 
like this:
    +
    +{% highlight xml %}
    +<dependencies>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-java</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-streaming-java_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-clients_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +    <dependency>
    +        <groupId>org.apache.flink</groupId>
    +        <artifactId>flink-connector-wikiedits_2.10</artifactId>
    +        <version>${flink.version}</version>
    +    </dependency>
    +</dependencies>
    +{% endhighlight %}
    +
    +Notice the `flink-connector-wikiedits_2.10` dependency that was added.
    +
    +## Writing a Flink Program
    +
    +It's coding time. Fire up your favorite IDE and import the Maven project 
or open a text editor and
    +create the file `src/main/wikiedits/WikipediaAnalysis.java`:
    +
    +{% highlight java %}
    +package wikiedits;
    +
    +public class WikipediaAnalysis {
    +
    +    public static void main(String[] args) throws Exception {
    +
    +    }
    +}
    +{% endhighlight %}
    +
    +I admit it's very bare bones now but we will fill it as we go. Note, that 
I'll not give
    +import statements here since IDEs can add them automatically. At the end 
of this section I'll show
    +the complete code with import statements if you simply want to skip ahead 
and enter that in your
    +editor.
    +
    +The first step in a Flink program is to create a 
`StreamExecutionEnvironment`
    +(or `ExecutionEnvironment` if you are writing a batch job). This can be 
used to set execution
    +parameters and create sources for reading from external systems. So let's 
go ahead, add
    +this to the main method:
    +
    +{% highlight java %}
    +StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +{% endhighlight %}
    +
    +Next, we will create a source that reads from the Wikipedia IRC log:
    +
    +{% highlight java %}
    +DataStream<WikipediaEditEvent> edits = see.addSource(new 
WikipediaEditsSource());
    +{% endhighlight %}
    +
    +This creates a `DataStream` of `WikipediaEditEvent` elements that we can 
further process. For
    +the purposes of this example we are interrested in determining the number 
of added or removed
    +bytes that each user causes in a certain time window, let's say five 
seconds. For this we first
    +have to specify that we want to key the stream on the user name, that is 
to say that operations
    +on this should take the key into account. In our case the summation of 
edited bytes in the windows
    +should be per unique user. For keying a Stream we have to provide a 
`KeySelector`, like this:
    +
    +{% highlight java %}
    +KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
    +    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
    +        @Override
    +        public String getKey(WikipediaEditEvent event) {
    +            return event.getUser();
    +        }
    +    });
    +{% endhighlight %}
    +
    +This gives us the same Stream of `WikipediaEditEvent` that has a `String` 
key that is the user.
    +We can now specify that we want to have windows imposed on this stream and 
compute some
    +result based on elements in these windows. We need to specify a window 
here because we are
    +dealing with an infinite stream of events. If you want to compute an 
aggregation on such an
    +infinite stream you never know when you are finished. That's where windows 
come into play,
    +they specify a time slice in which we should perform our computation. In 
our example we will say
    +that we want to aggregate the sum of edited bytes for every five seconds:
    +
    +{% highlight java %}
    +DataStream<Tuple2<String, Long>> result = keyedEdits
    +    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    +    .fold(new Tuple2<>("", 0L), new FoldFunction<WikipediaEditEvent, 
Tuple2<String, Long>>() {
    +        @Override
    +        public Tuple2<String, Long> fold(Tuple2<String, Long> acc, 
WikipediaEditEvent event) {
    +            acc.f0 = event.getUser();
    +            acc.f1 += event.getByteDiff();
    +            return acc;
    +        }
    +    });
    +{% endhighlight %}
    +
    +The first call, `.window()`, specified that we want to have tumbling 
(non-overlapping) windows
    +of five seconds. The second call specifies a *Fold transformation* on each 
window slice for
    +each unique key. In our case we start from an initial value of `("", 0L)` 
and add to it the byte
    +difference of every edit in that time window for a user. The resulting 
Stream now contains
    +a `Tuple2<String, Long>` for every user which gets emitted every five 
seconds.
    +
    +The only thing left to do is print the stream to the console and start 
execution:
    +
    +{% highlight java %}
    +result.print();
    +
    +see.execute();
    +{% endhighlight %}
    +
    +That last call is necessary to start the actual Flink job. All operations, 
such as creating
    +sources, transformations and sinks only build up a graph of internal 
operations. Only when
    +`execute()` is called is this graph of operations thrown on a cluster or 
executed on your local
    --- End diff --
    
    I wouldn't, because as you said it complicates stuff.


> Replace Quickstart K-Means Example by Streaming Example
> -------------------------------------------------------
>
>                 Key: FLINK-3591
>                 URL: https://issues.apache.org/jira/browse/FLINK-3591
>             Project: Flink
>          Issue Type: Improvement
>          Components: Documentation
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to