http://git-wip-us.apache.org/repos/asf/flink/blob/ad267a4b/docs/apis/streaming/index.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md new file mode 100644 index 0000000..06c0014 --- /dev/null +++ b/docs/apis/streaming/index.md @@ -0,0 +1,3306 @@ +--- +title: "Flink DataStream API Programming Guide" + +# Top-level navigation +top-nav-group: apis +top-nav-pos: 1 +top-nav-title: <strong>Streaming Guide</strong> (DataStream API) + +# Sub-level navigation +sub-nav-group: streaming +sub-nav-group-title: Streaming Guide +sub-nav-pos: 1 +sub-nav-title: DataStream API +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +DataStream programs in Flink are regular programs that implement transformations on data streams +(e.g., filtering, updating state, defining windows, aggregating). The data streams are initially created from various +sources (e.g., message queues, socket streams, files). Results are returned via sinks, which may for +example write the data to files, or to standard output (for example the command line +terminal). Flink programs run in a variety of contexts, standalone, or embedded in other programs. +The execution can happen in a local JVM, or on clusters of many machines. + +In order to create your own Flink DataStream program, we encourage you to start with the +[program skeleton](#program-skeleton) and gradually add your own +[transformations](#transformations). The remaining sections act as references for additional +operations and advanced features. + + +* This will be replaced by the TOC +{:toc} + + +Example Program +--------------- + +The following program is a complete, working example of streaming window word count application, that counts the +words coming from a web socket in 5 second windows. You can copy & paste the code to run it locally. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +{% highlight java %} +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.windowing.time.Time; +import org.apache.flink.util.Collector; + +public class WindowWordCount { + + public static void main(String[] args) throws Exception { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<String, Integer>> dataStream = env + .socketTextStream("localhost", 9999) + .flatMap(new Splitter()) + .keyBy(0) + .timeWindow(Time.of(5, TimeUnit.SECONDS)) + .sum(1); + + dataStream.print(); + + env.execute("Window WordCount"); + } + + public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { + @Override + public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { + for (String word: sentence.split(" ")) { + out.collect(new Tuple2<String, Integer>(word, 1)); + } + } + } + +} +{% endhighlight %} + +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +import java.util.concurrent.TimeUnit + +import org.apache.flink.streaming.api.scala._ +import org.apache.flink.streaming.api.windowing.time.Time + +object WindowWordCount { + def main(args: Array[String]) { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val text = env.socketTextStream("localhost", 9999) + + val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } + .map { (_, 1) } + .keyBy(0) + .timeWindow(Time.of(5, TimeUnit.SECONDS)) + .sum(1) + + counts.print + + env.execute("Window Stream WordCount") + } +} +{% endhighlight %} +</div> + +</div> + +To run the example program, start the input stream with netcat first from a terminal: + +~~~bash +nc -lk 9999 +~~~ + +Just type some words hitting return for a new word. These will be the input to the +word count program. If you want to see counts greater than 1, type the same word again and again within +5 seconds (increase the window size from 5 seconds if you cannot type that fast ☺). + +{% top %} + + +Linking with Flink +------------------ + +To write programs with Flink, you need to include the Flink DataStream library corresponding to +your programming language in your project. + +The simplest way to do this is to use one of the quickstart scripts: either for +[Java]({{ site.baseurl }}/quickstart/java_api_quickstart.html) or for [Scala]({{ site.baseurl }}/quickstart/scala_api_quickstart.html). They +create a blank project from a template (a Maven Archetype), which sets up everything for you. To +manually create the project, you can use the archetype and create a project by calling: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight bash %} +mvn archetype:generate / + -DarchetypeGroupId=org.apache.flink/ + -DarchetypeArtifactId=flink-quickstart-java / + -DarchetypeVersion={{site.version }} +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight bash %} +mvn archetype:generate / + -DarchetypeGroupId=org.apache.flink/ + -DarchetypeArtifactId=flink-quickstart-scala / + -DarchetypeVersion={{site.version }} +{% endhighlight %} +</div> +</div> + +The archetypes are working for stable releases and preview versions (`-SNAPSHOT`). + +If you want to add Flink to an existing Maven project, add the following entry to your +*dependencies* section in the *pom.xml* file of your project: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>{{site.version }}</version> +</dependency> +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight xml %} +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-scala</artifactId> + <version>{{site.version }}</version> +</dependency> +<dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>{{site.version }}</version> +</dependency> +{% endhighlight %} +</div> +</div> + +In order to create your own Flink program, we encourage you to start with the +[program skeleton](#program-skeleton) and gradually add your own +[transformations](#transformations). + +{% top %} + +Program Skeleton +---------------- + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +<br /> + +As presented in the [example](#example-program), Flink DataStream programs look like regular Java +programs with a `main()` method. Each program consists of the same basic parts: + +1. Obtaining a `StreamExecutionEnvironment`, +2. Connecting to data stream sources, +3. Specifying transformations on the data streams, +4. Specifying output for the processed data, +5. Executing the program. + +We will now give an overview of each of those steps, please refer to the respective sections for +more details. + +The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can +obtain one using these static methods on class `StreamExecutionEnvironment`: + +{% highlight java %} +getExecutionEnvironment() + +createLocalEnvironment() +createLocalEnvironment(int parallelism) +createLocalEnvironment(int parallelism, Configuration customConfiguration) + +createRemoteEnvironment(String host, int port, String... jarFiles) +createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles) +{% endhighlight %} + +Typically, you only need to use `getExecutionEnvironment()`, since this +will do the right thing depending on the context: if you are executing +your program inside an IDE or as a regular Java program it will create +a local environment that will execute your program on your local machine. If +you created a JAR file from your program, and invoke it through the [command line]({{ site.baseurl }}/apis/cli.html) +or the [web interface]({{ site.baseurl }}/apis/web_client.html), +the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return +an execution environment for executing your program on a cluster. + +For specifying data sources the execution environment has several methods +to read from files, sockets, and external systems using various methods. To just read +data from a socket (useful also for debugging), you can use: + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +DataStream<String> lines = env.socketTextStream("localhost", 9999) +{% endhighlight %} + +This will give you a DataStream on which you can then apply transformations. For +more information on data sources and input formats, please refer to +[Data Sources](#data-sources). + +Once you have a DataStream you can apply transformations to create a new +DataStream which you can then write to a socket, transform again, +combine with other DataStreams, or push to an external system (e.g., a message queue, or a file system). +You apply transformations by calling +methods on DataStream with your own custom transformation functions. For example, +a map transformation looks like this: + +{% highlight java %} +DataStream<String> input = ...; + +DataStream<Integer> intValues = input.map(new MapFunction<String, Integer>() { + @Override + public Integer map(String value) { + return Integer.parseInt(value); + } +}); +{% endhighlight %} + +This will create a new DataStream by converting every String in the original +stream to an Integer. For more information and a list of all the transformations, +please refer to [Transformations](#transformations). + +Once you have a DataStream containing your final results, you can push the result +to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, +or print it. + +{% highlight java %} +writeAsText(String path, ...) +writeAsCsv(String path, ...) +writeToSocket(String hostname, int port, ...) + +print() + +addSink(...) +{% endhighlight %} + +Once you specified the complete program you need to **trigger the program execution** by +calling `execute()` on `StreamExecutionEnvironment`. This will either execute on +the local machine or submit the program for execution on a cluster, depending on the chosen execution environment. + +{% highlight java %} +env.execute(); +{% endhighlight %} + +</div> +<div data-lang="scala" markdown="1"> + +<br /> + +As presented in the [example](#example-program), Flink DataStream programs look like regular Scala +programs with a `main()` method. Each program consists of the same basic parts: + +1. Obtaining a `StreamExecutionEnvironment`, +2. Connecting to data stream sources, +3. Specifying transformations on the data streams, +4. Specifying output for the processed data, +5. Executing the program. + +We will now give an overview of each of those steps, please refer to the respective sections for +more details. + +The `StreamExecutionEnvironment` is the basis for all Flink DataStream programs. You can +obtain one using these static methods on class `StreamExecutionEnvironment`: + +{% highlight scala %} +def getExecutionEnvironment + +def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()) + +def createRemoteEnvironment(host: String, port: Int, jarFiles: String*) +def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*) +{% endhighlight %} + +Typically, you only need to use `getExecutionEnvironment`, since this +will do the right thing depending on the context: if you are executing +your program inside an IDE or as a regular Java program it will create +a local environment that will execute your program on your local machine. If +you created a JAR file from your program, and invoke it through the [command line](cli.html) +or the [web interface](web_client.html), +the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return +an execution environment for executing your program on a cluster. + +For specifying data sources the execution environment has several methods +to read from files, sockets, and external systems using various methods. To just read +data from a socket (useful also for debugging), you can use: + +{% highlight scala %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment + +DataStream<String> lines = env.socketTextStream("localhost", 9999) +{% endhighlight %} + +This will give you a DataStream on which you can then apply transformations. For +more information on data sources and input formats, please refer to +[Data Sources](#data-sources). + +Once you have a DataStream you can apply transformations to create a new +DataStream which you can then write to a file, transform again, +combine with other DataStreams, or push to an external system. +You apply transformations by calling +methods on DataStream with your own custom transformation function. For example, +a map transformation looks like this: + +{% highlight scala %} +val input: DataStream[String] = ... + +val mapped = input.map { x => x.toInt } +{% endhighlight %} + +This will create a new DataStream by converting every String in the original +set to an Integer. For more information and a list of all the transformations, +please refer to [Transformations](#transformations). + +Once you have a DataStream containing your final results, you can push the result +to an external system (HDFS, Kafka, Elasticsearch), write it to a socket, write to a file, +or print it. + +{% highlight scala %} +writeAsText(path: String, ...) +writeAsCsv(path: String, ...) +writeToSocket(hostname: String, port: Int, ...) + +print() + +addSink(...) +{% endhighlight %} + +Once you specified the complete program you need to **trigger the program execution** by +calling `execute` on `StreamExecutionEnvironment`. This will either execute on +the local machine or submit the program for execution on a cluster, depending on the chosen execution environment. + +{% highlight scala %} +env.execute() +{% endhighlight %} + +</div> +</div> + +{% top %} + +DataStream Abstraction +---------------------- + +A `DataStream` is a possibly unbounded immutable collection of data items of a the same type. + +Transformations may return different subtypes of `DataStream` allowing specialized transformations. +For example the `keyBy(â¦)` method returns a `KeyedDataStream` which is a stream of data that +is logically partitioned by a certain key, and can be further windowed. + +{% top %} + +Lazy Evaluation +--------------- + +All Flink DataStream programs are executed lazily: When the program's main method is executed, the data loading +and transformations do not happen directly. Rather, each operation is created and added to the +program's plan. The operations are actually executed when the execution is explicitly triggered by +an `execute()` call on the `StreamExecutionEnvironment` object. Whether the program is executed locally +or on a cluster depends on the type of `StreamExecutionEnvironment`. + +The lazy evaluation lets you construct sophisticated programs that Flink executes as one +holistically planned unit. + +{% top %} + + +Transformations +--------------- + +Data transformations transform one or more DataStreams into a new DataStream. Programs can combine +multiple transformations into sophisticated topologies. + +This section gives a description of all the available transformations. + + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Map</strong><br>DataStream → DataStream</td> + <td> + <p>Takes one element and produces one element. A map function that doubles the values of the input stream:</p> + {% highlight java %} +DataStream<Integer> dataStream = //... +dataStream.map(new MapFunction<Integer, Integer>() { + @Override + public Integer map(Integer value) throws Exception { + return 2 * value; + } +}); + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>FlatMap</strong><br>DataStream → DataStream</td> + <td> + <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p> + {% highlight java %} +dataStream.flatMap(new FlatMapFunction<String, String>() { + @Override + public void flatMap(String value, Collector<String> out) + throws Exception { + for(String word: value.split(" ")){ + out.collect(word); + } + } +}); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Filter</strong><br>DataStream → DataStream</td> + <td> + <p>Evaluates a boolean function for each element and retains those for which the function returns true. + A filter that filters out zero values: + </p> + {% highlight java %} +dataStream.filter(new FilterFunction<Integer>() { + @Override + public boolean filter(Integer value) throws Exception { + return value != 0; + } +}); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>KeyBy</strong><br>DataStream → KeyedStream</td> + <td> + <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. + Internally, this is implemented with hash partitioning. See <a href="#specifying-keys">keys</a> on how to specify keys. + This transformation returns a KeyedDataStream.</p> + {% highlight java %} +dataStream.keyBy("someKey") // Key by field "someKey" +dataStream.keyBy(0) // Key by the first element of a Tuple + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Reduce</strong><br>KeyedStream → DataStream</td> + <td> + <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and + emits the new value. + <br/> + <br/> + A reduce function that creates a stream of partial sums:</p> + {% highlight java %} +keyedStream.reduce(new ReduceFunction<Integer>() { + @Override + public Integer reduce(Integer value1, Integer value2) + throws Exception { + return value1 + value2; + } +}); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Fold</strong><br>KeyedStream → DataStream</td> + <td> + <p>A "rolling" fold on a keyed data stream with an initial value. + Combines the current element with the last folded value and + emits the new value. + <br/> + <br/> + <p>A fold function that, when applied on the sequence (1,2,3,4,5), + emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p> + {% highlight java %} +DataStream<String> result = + keyedStream.fold("start", new FoldFunction<Integer, String>() { + @Override + public String fold(String current, Integer value) { + return current + "-" + value; + } + }); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Aggregations</strong><br>KeyedStream → DataStream</td> + <td> + <p>Rolling aggregations on a keyed data stream. The difference between min + and minBy is that min returns the minimun value, whereas minBy returns + the element that has the minimum value in this field (same for max and maxBy).</p> + {% highlight java %} +keyedStream.sum(0); +keyedStream.sum("key"); +keyedStream.min(0); +keyedStream.min("key"); +keyedStream.max(0); +keyedStream.max("key"); +keyedStream.minBy(0); +keyedStream.minBy("key"); +keyedStream.maxBy(0); +keyedStream.maxBy("key"); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each + key according to some characteristic (e.g., the data that arrived within the last 5 seconds). + See <a href="#windows">windows</a> for a complete description of windows. + {% highlight java %} +dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>WindowAll</strong><br>DataStream → AllWindowedStream</td> + <td> + <p>Windows can be defined on regular DataStreams. Windows group all the stream events + according to some characteristic (e.g., the data that arrived within the last 5 seconds). + See <a href="#windows">windows</a> for a complete description of windows.</p> + <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be + gathered in one task for the windowAll operator.</p> + {% highlight java %} +dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))); // Last 5 seconds of data + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window Apply</strong><br>WindowedStream → DataStream<br>AllWindowedStream → DataStream</td> + <td> + <p>Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.</p> + <p><strong>Note:</strong> If you are using a windowAll transformation, you need to use an AllWindowFunction instead.</p> + {% highlight java %} +windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { + public void apply (Tuple tuple, + Window window, + Iterable<Tuple2<String, Integer>> values, + Collector<Integer> out) throws Exception { + int sum = 0; + for (value t: values) { + sum += t.f1; + } + out.collect (new Integer(sum)); + } +}); + +// applying an AllWindowFunction on non-keyed window stream +allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() { + public void apply (Window window, + Iterable<Tuple2<String, Integer>> values, + Collector<Integer> out) throws Exception { + int sum = 0; + for (value t: values) { + sum += t.f1; + } + out.collect (new Integer(sum)); + } +}); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window Reduce</strong><br>WindowedStream → DataStream</td> + <td> + <p>Applies a functional reduce function to the window and returns the reduced value.</p> + {% highlight java %} +windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>() { + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception { + return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1); + } +}; + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window Fold</strong><br>WindowedStream → DataStream</td> + <td> + <p>Applies a functional fold function to the window and returns the folded value. + The example function, when applied on the sequence (1,2,3,4,5), + folds the sequence into the string "start-1-2-3-4-5":</p> + {% highlight java %} +windowedStream.fold("start-", new FoldFunction<Integer, String>() { + public String fold(String current, Integer value) { + return current + "-" + value; + } +}; + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Aggregations on windows</strong><br>WindowedStream → DataStream</td> + <td> + <p>Aggregates the contents of a window. The difference between min + and minBy is that min returns the minimun value, whereas minBy returns + the element that has the minimum value in this field (same for max and maxBy).</p> + {% highlight java %} +windowedStream.sum(0); +windowedStream.sum("key"); +windowedStream.min(0); +windowedStream.min("key"); +windowedStream.max(0); +windowedStream.max("key"); +windowedStream.minBy(0); +windowedStream.minBy("key"); +windowedStream.maxBy(0); +windowedStream.maxBy("key"); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Union</strong><br>DataStream* → DataStream</td> + <td> + <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream + with itself you will get each element twice in the resulting stream.</p> + {% highlight java %} +dataStream.union(otherStream1, otherStream2, ...); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window Join</strong><br>DataStream,DataStream → DataStream</td> + <td> + <p>Join two data streams on a given key and a common window.</p> + {% highlight java %} +dataStream.join(otherStream) + .where(0).equalTo(1) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) + .apply (new JoinFunction () {...}); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window CoGroup</strong><br>DataStream,DataStream → DataStream</td> + <td> + <p>Cogroups two data streams on a given key and a common window.</p> + {% highlight java %} +dataStream.coGroup(otherStream) + .where(0).equalTo(1) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) + .apply (new CoGroupFunction () {...}); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Connect</strong><br>DataStream,DataStream → ConnectedStreams</td> + <td> + <p>"Connects" two data streams retaining their types. Connect allowing for shared state between + the two streams.</p> + {% highlight java %} +DataStream<Integer> someStream = //... +DataStream<String> otherStream = //... + +ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams → DataStream</td> + <td> + <p>Similar to map and flatMap on a connected data stream</p> + {% highlight java %} +connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { + @Override + public Boolean map1(Integer value) { + return true; + } + + @Override + public Boolean map2(String value) { + return false; + } +}); +connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() { + + @Override + public void flatMap1(Integer value, Collector<String> out) { + out.collect(value.toString()); + } + + @Override + public void flatMap2(String value, Collector<String> out) { + for (String word: value.split(" ")) { + out.collect(word); + } + } +}); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Split</strong><br>DataStream → SplitStream</td> + <td> + <p> + Split the stream into two or more streams according to some criterion. + {% highlight java %} +SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { + @Override + public Iterable<String> select(Integer value) { + List<String> output = new ArrayList<String>(); + if (value % 2 == 0) { + output.add("even"); + } + else { + output.add("odd"); + } + return output; + } +}); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Select</strong><br>SplitStream → DataStream</td> + <td> + <p> + Select one or more streams from a split stream. + {% highlight java %} +SplitStream<Integer> split; +DataStream<Integer> even = split.select("even"); +DataStream<Integer> odd = split.select("odd"); +DataStream<Integer> all = split.select("even","odd"); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Iterate</strong><br>DataStream → IterativeStream → DataStream</td> + <td> + <p> + Creates a "feedback" loop in the flow, by redirecting the output of one operator + to some previous operator. This is especially useful for defining algorithms that + continuously update a model. The following code starts with a stream and applies + the iteration body continuously. Elements that are greater than 0 are sent back + to the feedback channel, and the rest of the elements are forwarded downstream. + See <a href="#iterations">iterations</a> for a complete description. + {% highlight java %} +IterativeStream<Long> iteration = initialStream.iterate(); +DataStream<Long> iterationBody = iteration.map (/*do something*/); +DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ + @Override + public boolean filter(Integer value) throws Exception { + return value > 0; + } +}); +iteration.closeWith(feedback); +DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ + @Override + public boolean filter(Integer value) throws Exception { + return value <= 0; + } +}); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Extract Timestamps</strong><br>DataStream → DataStream</td> + <td> + <p> + Extracts timestamps from records in order to work with windows + that use event time semantics. See <a href="#working-with-time">working with time</a>. + {% highlight java %} +stream.assignTimestamps (new TimeStampExtractor() {...}); + {% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> + +<div data-lang="scala" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Map</strong><br>DataStream → DataStream</td> + <td> + <p>Takes one element and produces one element. A map function that doubles the values of the input stream:</p> + {% highlight scala %} +dataStream.map { x => x * 2 } + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>FlatMap</strong><br>DataStream → DataStream</td> + <td> + <p>Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:</p> + {% highlight scala %} +dataStream.flatMap { str => str.split(" ") } + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Filter</strong><br>DataStream → DataStream</td> + <td> + <p>Evaluates a boolean function for each element and retains those for which the function returns true. + A filter that filters out zero values: + </p> + {% highlight scala %} +dataStream.filter { _ != 0 } + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>KeyBy</strong><br>DataStream → KeyedStream</td> + <td> + <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. + Internally, this is implemented with hash partitioning. See <a href="#specifying-keys">keys</a> on how to specify keys. + This transformation returns a KeyedDataStream.</p> + {% highlight scala %} +dataStream.keyBy("someKey") // Key by field "someKey" +dataStream.keyBy(0) // Key by the first element of a Tuple + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Reduce</strong><br>KeyedStream → DataStream</td> + <td> + <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and + emits the new value. + <br/> + <br/> + A reduce function that creates a stream of partial sums:</p> + {% highlight scala %} +keyedStream.reduce { _ + _ } + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Fold</strong><br>KeyedStream → DataStream</td> + <td> + <p>A "rolling" fold on a keyed data stream with an initial value. + Combines the current element with the last folded value and + emits the new value. + <br/> + <br/> + <p>A fold function that, when applied on the sequence (1,2,3,4,5), + emits the sequence "start-1", "start-1-2", "start-1-2-3", ...</p> + {% highlight scala %} +val result: DataStream[String] = + keyedStream.fold("start", (str, i) => { str + "-" + i }) + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Aggregations</strong><br>KeyedStream → DataStream</td> + <td> + <p>Rolling aggregations on a keyed data stream. The difference between min + and minBy is that min returns the minimun value, whereas minBy returns + the element that has the minimum value in this field (same for max and maxBy).</p> + {% highlight scala %} +keyedStream.sum(0) +keyedStream.sum("key") +keyedStream.min(0) +keyedStream.min("key") +keyedStream.max(0) +keyedStream.max("key") +keyedStream.minBy(0) +keyedStream.minBy("key") +keyedStream.maxBy(0) +keyedStream.maxBy("key") + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each + key according to some characteristic (e.g., the data that arrived within the last 5 seconds). + See <a href="#windows">windows</a> for a description of windows. + {% highlight scala %} +dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>WindowAll</strong><br>DataStream → AllWindowedStream</td> + <td> + <p>Windows can be defined on regular DataStreams. Windows group all the stream events + according to some characteristic (e.g., the data that arrived within the last 5 seconds). + See <a href="#windows">windows</a> for a complete description of windows.</p> + <p><strong>WARNING:</strong> This is in many cases a <strong>non-parallel</strong> transformation. All records will be + gathered in one task for the windowAll operator.</p> + {% highlight scala %} +dataStream.windowAll(TumblingTimeWindows.of(Time.of(5, TimeUnit.SECONDS))) // Last 5 seconds of data + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window Apply</strong><br>WindowedStream → DataStream<br>AllWindowedStream → DataStream</td> + <td> + <p>Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.</p> + <p><strong>Note:</strong> If you are using a windowAll transformation, you need to use an AllWindowFunction instead.</p> + {% highlight scala %} +windowedStream.apply { WindowFunction } + +// applying an AllWindowFunction on non-keyed window stream +allWindowedStream.apply { AllWindowFunction } + + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window Reduce</strong><br>WindowedStream → DataStream</td> + <td> + <p>Applies a functional reduce function to the window and returns the reduced value.</p> + {% highlight scala %} +windowedStream.reduce { _ + _ } + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window Fold</strong><br>WindowedStream → DataStream</td> + <td> + <p>Applies a functional fold function to the window and returns the folded value. + The example function, when applied on the sequence (1,2,3,4,5), + folds the sequence into the string "start-1-2-3-4-5":</p> + {% highlight scala %} +val result: DataStream[String] = + windowedStream.fold("start", (str, i) => { str + "-" + i }) + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Aggregations on windows</strong><br>WindowedStream → DataStream</td> + <td> + <p>Aggregates the contents of a window. The difference between min + and minBy is that min returns the minimun value, whereas minBy returns + the element that has the minimum value in this field (same for max and maxBy).</p> + {% highlight scala %} +windowedStream.sum(0) +windowedStream.sum("key") +windowedStream.min(0) +windowedStream.min("key") +windowedStream.max(0) +windowedStream.max("key") +windowedStream.minBy(0) +windowedStream.minBy("key") +windowedStream.maxBy(0) +windowedStream.maxBy("key") + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Union</strong><br>DataStream* → DataStream</td> + <td> + <p>Union of two or more data streams creating a new stream containing all the elements from all the streams. Node: If you union a data stream + with itself you will get each element twice in the resulting stream.</p> + {% highlight scala %} +dataStream.union(otherStream1, otherStream2, ...) + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window Join</strong><br>DataStream,DataStream → DataStream</td> + <td> + <p>Join two data streams on a given key and a common window.</p> + {% highlight scala %} +dataStream.join(otherStream) + .where(0).equalTo(1) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) + .apply { ... } + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Window CoGroup</strong><br>DataStream,DataStream → DataStream</td> + <td> + <p>Cogroups two data streams on a given key and a common window.</p> + {% highlight scala %} +dataStream.coGroup(otherStream) + .where(0).equalTo(1) + .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.SECONDS))) + .apply {} + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Connect</strong><br>DataStream,DataStream → ConnectedStreams</td> + <td> + <p>"Connects" two data streams retaining their types, allowing for shared state between + the two streams.</p> + {% highlight scala %} +someStream : DataStream[Int] = ... +otherStream : DataStream[String] = ... + +val connectedStreams = someStream.connect(otherStream) + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>CoMap, CoFlatMap</strong><br>ConnectedStreams → DataStream</td> + <td> + <p>Similar to map and flatMap on a connected data stream</p> + {% highlight scala %} +connectedStreams.map( + (_ : Int) => true, + (_ : String) => false +) +connectedStreams.flatMap( + (_ : Int) => true, + (_ : String) => false +) + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Split</strong><br>DataStream → SplitStream</td> + <td> + <p> + Split the stream into two or more streams according to some criterion. + {% highlight scala %} +val split = someDataStream.split( + (num: Int) => + (num % 2) match { + case 0 => List("even") + case 1 => List("odd") + } +) + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Select</strong><br>SplitStream → DataStream</td> + <td> + <p> + Select one or more streams from a split stream. + {% highlight scala %} + +val even = split select "even" +val odd = split select "odd" +val all = split.select("even","odd") + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Iterate</strong><br>DataStream → IterativeStream → DataStream</td> + <td> + <p> + Creates a "feedback" loop in the flow, by redirecting the output of one operator + to some previous operator. This is especially useful for defining algorithms that + continuously update a model. The following code starts with a stream and applies + the iteration body continuously. Elements that are greater than 0 are sent back + to the feedback channel, and the rest of the elements are forwarded downstream. + See <a href="#iterations">iterations</a> for a complete description. + {% highlight java %} +initialStream. iterate { + iteration => { + val iterationBody = iteration.map {/*do something*/} + (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) + } +} +IterativeStream<Long> iteration = initialStream.iterate(); +DataStream<Long> iterationBody = iteration.map (/*do something*/); +DataStream<Long> feedback = iterationBody.filter ( _ > 0); +iteration.closeWith(feedback); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Extract Timestamps</strong><br>DataStream → DataStream</td> + <td> + <p> + Extracts timestamps from records in order to work with windows + that use event time semantics. + See <a href="#working-with-time">working with time</a>. + {% highlight scala %} +stream.assignTimestamps { timestampExtractor } + {% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> +</div> + +The following transformations are available on data streams of Tuples: + + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Project</strong><br>DataStream → DataStream</td> + <td> + <p>Selects a subset of fields from the tuples +{% highlight java %} +DataStream<Tuple3<Integer, Double, String>> in = // [...] +DataStream<Tuple2<String, Integer>> out = in.project(2,0); +{% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> + +<div data-lang="scala" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Project</strong><br>DataStream → DataStream</td> + <td> + <p>Selects a subset of fields from the tuples +{% highlight scala %} +val in : DataStream[(Int,Double,String)] = // [...] +val out = in.project(2,0) +{% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> +</div> + + +### Physical partitioning + +Flink also gives low-level control (if desired) on the exact stream partitioning after a transformation, +via the following functions. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Hash partitioning</strong><br>DataStream → DataStream</td> + <td> + <p> + Identical to keyBy but returns a DataStream instead of a KeyedStream. + {% highlight java %} +dataStream.partitionByHash("someKey"); +dataStream.partitionByHash(0); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Custom partitioning</strong><br>DataStream → DataStream</td> + <td> + <p> + Uses a user-defined Partitioner to select the target task for each element. + {% highlight java %} +dataStream.partitionCustom(new Partitioner(){...}, "someKey"); +dataStream.partitionCustom(new Partitioner(){...}, 0); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Random partitioning</strong><br>DataStream → DataStream</td> + <td> + <p> + Partitions elements randomly according to a uniform distribution. + {% highlight java %} +dataStream.partitionRandom(); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Rebalancing (Round-robin partitioning)</strong><br>DataStream → DataStream</td> + <td> + <p> + Partitions elements round-robin, creating equal load per partition. Useful for performance + optimization in the presence of data skew. + {% highlight java %} +dataStream.rebalance(); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Broadcasting</strong><br>DataStream → DataStream</td> + <td> + <p> + Broadcasts elements to every partition. + {% highlight java %} +dataStream.broadcast(); + {% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> + +<div data-lang="scala" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Hash partitioning</strong><br>DataStream → DataStream</td> + <td> + <p> + Identical to keyBy but returns a DataStream instead of a KeyedStream. + {% highlight scala %} +dataStream.partitionByHash("someKey") +dataStream.partitionByHash(0) + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Custom partitioning</strong><br>DataStream → DataStream</td> + <td> + <p> + Uses a user-defined Partitioner to select the target task for each element. + {% highlight scala %} +dataStream.partitionCustom(partitioner, "someKey") +dataStream.partitionCustom(partitioner, 0) + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Random partitioning</strong><br>DataStream → DataStream</td> + <td> + <p> + Partitions elements randomly according to a uniform distribution. + {% highlight scala %} +dataStream.partitionRandom() + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Rebalancing (Round-robin partitioning)</strong><br>DataStream → DataStream</td> + <td> + <p> + Partitions elements round-robin, creating equal load per partition. Useful for performance + optimization in the presence of data skew. + {% highlight scala %} +dataStream.rebalance() + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Broadcasting</strong><br>DataStream → DataStream</td> + <td> + <p> + Broadcasts elements to every partition. + {% highlight scala %} +dataStream.broadcast() + {% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> +</div> + +### Task chaining and resource groups + +Chaining two subsequent transformations means co-locating them within the same thread for better +performance. Flink by default chains operators if this is possible (e.g., two subsequent map +transformations). The API gives fine-grained control over chaining if desired: + +Use `StreamExecutionEnvironment.disableOperatorChaining()` if you want to disable chaining in +the whole job. For more fine grained control, the following functions are available. Note that +these functions can only be used right after a DataStream transformation as they refer to the +previous transformation. For example, you can use `someStream.map(...).startNewChain()`, but +you cannot use `someStream.startNewChain()`. + +A resource group is a slot in Flink, see +[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots). You can +manually isolate operators in separate slots if desired. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>Start new chain</td> + <td> + <p>Begin a new chain, starting with this operator. The two + mappers will be chained, and filter will not be chained to + the first mapper. +{% highlight java %} +someStream.filter(...).map(...).startNewChain().map(...); +{% endhighlight %} + </p> + </td> + </tr> + <tr> + <td>Disable chaining</td> + <td> + <p>Do not chain the map operator +{% highlight java %} +someStream.map(...).disableChaining(); +{% endhighlight %} + </p> + </td> + </tr> + <tr> + <td>Start a new resource group</td> + <td> + <p>Start a new resource group containing the map and the subsequent operators. +{% highlight java %} +someStream.filter(...).startNewResourceGroup(); +{% endhighlight %} + </p> + </td> + </tr> + <tr> + <td>Isolate resources</td> + <td> + <p>Isolate the operator in its own slot. +{% highlight java %} +someStream.map(...).isolateResources(); +{% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> + +<div data-lang="scala" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td>Start new chain</td> + <td> + <p>Begin a new chain, starting with this operator. The two + mappers will be chained, and filter will not be chained to + the first mapper. +{% highlight scala %} +someStream.filter(...).map(...).startNewChain().map(...) +{% endhighlight %} + </p> + </td> + </tr> + <tr> + <td>Disable chaining</td> + <td> + <p>Do not chain the map operator +{% highlight scala %} +someStream.map(...).disableChaining() +{% endhighlight %} + </p> + </td> + </tr> + <tr> + <td>Start a new resource group</td> + <td> + <p>Start a new resource group containing the map and the subsequent operators. +{% highlight scala %} +someStream.filter(...).startNewResourceGroup() +{% endhighlight %} + </p> + </td> + </tr> + <tr> + <td>Isolate resources</td> + <td> + <p>Isolate the operator in its own slot. +{% highlight scala %} +someStream.map(...).isolateResources() +{% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> +</div> + + +{% top %} + +Specifying Keys +---------------- + +The `keyBy` transformation requires that a key is defined on +its argument DataStream. + +A DataStream is keyed as +{% highlight java %} +DataStream<...> input = // [...] +DataStream<...> windowed = input + .keyBy(/*define key here*/) + .window(/*define window here*/); +{% endhighlight %} + +The data model of Flink is not based on key-value pairs. Therefore, +you do not need to physically pack the data stream types into keys and +values. Keys are "virtual": they are defined as functions over the +actual data to guide the grouping operator. + +See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#specifying-keys) on how to specify keys. +Just replace `DataSet` with `DataStream`, and `groupBy` with `keyBy`. + + + +Passing Functions to Flink +-------------------------- + +Some transformations take user-defined functions as arguments. + +See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#passing-functions-to-flink). + + +{% top %} + + +Data Types +---------- + +Flink places some restrictions on the type of elements that are used in DataStreams and in results +of transformations. The reason for this is that the system analyzes the types to determine +efficient execution strategies. + +See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#data-types). + +{% top %} + + +Data Sources +------------ + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +<br /> + +Sources can by created by using `StreamExecutionEnvironment.addSource(sourceFunction)`. +You can either use one of the source functions that come with Flink or write a custom source +by implementing the `SourceFunction` for non-parallel sources, or by implementing the +`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` for parallel sources. + +There are several predefined stream sources accessible from the `StreamExecutionEnvironment`: + +File-based: + +- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings. + +- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as + StringValues. StringValues are mutable strings. + +- `readFile(path)` / Any input format - Reads files as dictated by the input format. + +- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`. + +- `readFileStream` - create a stream by appending elements when there are changes to a file + +Socket-based: + +- `socketTextStream` - Reads from a socket. Elements can be separated by a delimiter. + +Collection-based: + +- `fromCollection(Collection)` - Creates a data stream from the Java Java.util.Collection. All elements + in the collection must be of the same type. + +- `fromCollection(Iterator, Class)` - Creates a data stream from an iterator. The class specifies the + data type of the elements returned by the iterator. + +- `fromElements(T ...)` - Creates a data stream from the given sequence of objects. All objects must be + of the same type. + +- `fromParallelCollection(SplittableIterator, Class)` - Creates a data stream from an iterator, in + parallel. The class specifies the data type of the elements returned by the iterator. + +- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in + parallel. + +Custom: + +- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use + `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details. + +</div> + +<div data-lang="scala" markdown="1"> + +<br /> + +Sources can by created by using `StreamExecutionEnvironment.addSource(sourceFunction)`. +You can either use one of the source functions that come with Flink or write a custom source +by implementing the `SourceFunction` for non-parallel sources, or by implementing the +`ParallelSourceFunction` interface or extending `RichParallelSourceFunction` for parallel sources. + +There are several predefined stream sources accessible from the `StreamExecutionEnvironment`: + +File-based: + +- `readTextFile(path)` / `TextInputFormat` - Reads files line wise and returns them as Strings. + +- `readTextFileWithValue(path)` / `TextValueInputFormat` - Reads files line wise and returns them as + StringValues. StringValues are mutable strings. + +- `readFile(path)` / Any input format - Reads files as dictated by the input format. + +- `readFileOfPrimitives(path, Class)` / `PrimitiveInputFormat` - Parses files of new-line (or another char sequence) delimited primitive data types such as `String` or `Integer`. + +- `readFileStream` - create a stream by appending elements when there are changes to a file + +Socket-based: + +- `socketTextStream` - Reads from a socket. Elements can be separated by a delimiter. + +Collection-based: + +- `fromCollection(Seq)` - Creates a data stream from the Java Java.util.Collection. All elements + in the collection must be of the same type. + +- `fromCollection(Iterator)` - Creates a data stream from an iterator. The class specifies the + data type of the elements returned by the iterator. + +- `fromElements(elements: _*)` - Creates a data stream from the given sequence of objects. All objects must be + of the same type. + +- `fromParallelCollection(SplittableIterator)` - Creates a data stream from an iterator, in + parallel. The class specifies the data type of the elements returned by the iterator. + +- `generateSequence(from, to)` - Generates the sequence of numbers in the given interval, in + parallel. + +Custom: + +- `addSource` - Attache a new source function. For example, to read from Apache Kafka you can use + `addSource(new FlinkKafkaConsumer082<>(...))`. See [connectors]({{ site.baseurl }}/apis/streaming/connectors/) for more details. + +</div> +</div> + +{% top %} + + +Execution Configuration +---------- + +The `StreamExecutionEnvironment` also contains the `ExecutionConfig` which allows to set job specific configuration values for the runtime. + +See [the relevant section of the DataSet API documentation]({{ site.baseurl }}/apis/batch/index.html#execution-configuration). + +Parameters in the `ExecutionConfig` that pertain specifically to the DataStream API are: + +- `enableTimestamps()` / **`disableTimestamps()`**: Attach a timestamp to each event emitted from a source. + `areTimestampsEnabled()` returns the current value. + +- `setAutoWatermarkInterval(long milliseconds)`: Set the interval for automatic watermark emission. You can + get the current value with `long getAutoWatermarkInterval()` + +{% top %} + +Data Sinks +---------- + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +<br /> + +Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. +Flink comes with a variety of built-in output formats that are encapsulated behind operations on the +DataStreams: + +- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. The Strings are + obtained by calling the *toString()* method of each element. + +- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field + delimiters are configurable. The value for each field comes from the *toString()* method of the objects. + +- `print()` / `printToErr()` - Prints the *toString()* value +of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is +prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is +greater than 1, the output will also be prepended with the identifier of the task which produced the output. + +- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports + custom object-to-bytes conversion. + +- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema` + +- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as + Apache Kafka) that are implemented as sink functions. + +</div> +<div data-lang="scala" markdown="1"> + +<br /> + +Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. +Flink comes with a variety of built-in output formats that are encapsulated behind operations on the +DataStreams: + +- `writeAsText()` / `TextOuputFormat` - Writes elements line-wise as Strings. The Strings are + obtained by calling the *toString()* method of each element. + +- `writeAsCsv(...)` / `CsvOutputFormat` - Writes tuples as comma-separated value files. Row and field + delimiters are configurable. The value for each field comes from the *toString()* method of the objects. + +- `print()` / `printToErr()` - Prints the *toString()* value +of each element on the standard out / strandard error stream. Optionally, a prefix (msg) can be provided which is +prepended to the output. This can help to distinguish between different calls to *print*. If the parallelism is +greater than 1, the output will also be prepended with the identifier of the task which produced the output. + +- `write()` / `FileOutputFormat` - Method and base class for custom file outputs. Supports + custom object-to-bytes conversion. + +- `writeToSocket` - Writes elements to a socket according to a `SerializationSchema` + +- `addSink` - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as + Apache Kafka) that are implemented as sink functions. + +</div> +</div> + + +{% top %} + +Debugging +--------- + +Before running a streaming program in a distributed cluster, it is a good +idea to make sure that the implemented algorithm works as desired. Hence, implementing data analysis +programs is usually an incremental process of checking results, debugging, and improving. + +Flink provides features to significantly ease the development process of data analysis +programs by supporting local debugging from within an IDE, injection of test data, and collection of +result data. This section give some hints how to ease the development of Flink programs. + +### Local Execution Environment + +A `LocalStreamEnvironment` starts a Flink system within the same JVM process it was created in. If you +start the LocalEnvironement from an IDE, you can set breakpoints in your code and easily debug your +program. + +A LocalEnvironment is created and used as follows: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + +DataStream<String> lines = env.addSource(/* some source */); +// build your program + +env.execute(); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +val env = StreamExecutionEnvironment.createLocalEnvironment() + +val lines = env.addSource(/* some source */) +// build your program + +env.execute() +{% endhighlight %} +</div> +</div> + +### Collection Data Sources + +Flink provides special data sources which are backed +by Java collections to ease testing. Once a program has been tested, the sources and sinks can be +easily replaced by sources and sinks that read from / write to external systems. + +Collection data sources can be used as follows: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + +// Create a DataStream from a list of elements +DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5); + +// Create a DataStream from any Java collection +List<Tuple2<String, Integer>> data = ... +DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data); + +// Create a DataStream from an Iterator +Iterator<Long> longIt = ... +DataStream<Long> myLongs = env.fromCollection(longIt, Long.class); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.createLocalEnvironment() + +// Create a DataStream from a list of elements +val myInts = env.fromElements(1, 2, 3, 4, 5) + +// Create a DataStream from any Collection +val data: Seq[(String, Int)] = ... +val myTuples = env.fromCollection(data) + +// Create a DataStream from an Iterator +val longIt: Iterator[Long] = ... +val myLongs = env.fromCollection(longIt) +{% endhighlight %} +</div> +</div> + +**Note:** Currently, the collection data source requires that data types and iterators implement +`Serializable`. Furthermore, collection data sources can not be executed in parallel ( +parallelism = 1). + +### Iterator Data Sink + +Flink also provides a sink to collect DataStream results for testing and debugging purposes. It can be used as follows: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +import org.apache.flink.contrib.streaming.DataStreamUtils + +DataStream<Tuple2<String, Integer>> myResult = ... +Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult) +{% endhighlight %} + +</div> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +import org.apache.flink.contrib.streaming.DataStreamUtils +import scala.collection.JavaConverters.asScalaIteratorConverter + +val myResult: DataStream[(String, Int)] = ... +val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.getJavaStream).asScala +{% endhighlight %} +</div> +</div> + + +{% top %} + + +Windows +------- + +### Working with Time + +Windows are typically groups of events within a certain time period. Reasoning about time and windows assumes +a definition of time. Flink has support for three kinds of time: + +- *Processing time:* Processing time is simply the wall clock time of the machine that happens to be + executing the transformation. Processing time is the simplest notion of time and provides the best + performance. However, in distributed and asynchronous environments processing time does not provide + determinism. + +- *Event time:* Event time is the time that each individual event occurred. This time is + typically embedded within the records before they enter Flink or can be extracted from their contents. + When using event time, out-of-order events can be properly handled. For example, an event with a lower + timestamp may arrive after an event with a higher timestamp, but transformations will handle these events + correctly. Event time processing provides predictable results, but incurs more latency, as out-of-order + events need to be buffered + +- *Ingestion time:* Ingestion time is the time that events enter Flink. In particular, the timestamp of + an event is assigned by the source operator as the current wall clock time of the machine that executes + the source task at the time the records enter the Flink source. Ingestion time is more predictable + than processing time, and gives lower latencies than event time as the latency does not depend on + external systems. Ingestion time provides thus a middle ground between processing time and event time. + Ingestion time is a special case of event time (and indeed, it is treated by Flink identically to + event time). + +When dealing with event time, transformations need to avoid indefinite +wait times for events to arrive. *Watermarks* provide the mechanism to control the event time-processing time skew. Watermarks +are emitted by the sources. A watermark with a certain timestamp denotes the knowledge that no event +with timestamp lower than the timestamp of the watermark will ever arrive. + +You can specify the semantics of time in a Flink DataStream program using `StreamExecutionEnviroment`, as + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); +env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight java %} +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) +env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +{% endhighlight %} +</div> +</div> + +The default value is `TimeCharacteristic.ProcessingTime`, so in order to write a program with processing +time semantics nothing needs to be specified (e.g., the first [example](#example-program) in this guide follows processing +time semantics). + +In order to work with event time semantics, you need to follow four steps: + +- Set `env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` + +- Use `DataStream.assignTimestamps(...)` in order to tell Flink how timestamps relate to events (e.g., which + record field is the timestamp) + +- Set `enableTimestamps()`, as well the interval for watermark emission (`setAutoWatermarkInterval(long milliseconds)`) + in `ExecutionConfig`. + +For example, assume that we have a data stream of tuples, in which the first field is the timestamp (assigned +by the system that generates these data streams), and we know that the lag between the current processing +time and the timestamp of an event is never more than 1 second: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Tuple4<Long,Integer,Double,String>> stream = //... +stream.assignTimestamps(new TimestampExtractor<Tuple4<Long,Integer,Double,String>>{ + @Override + public long extractTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { + return element.f0; + } + + @Override + public long extractWatermark(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { + return element.f0 - 1000; + } + + @Override + public long getCurrentWatermark() { + return Long.MIN_VALUE; + } +}); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val stream: DataStream[(Long,Int,Double,String)] = null; +stream.assignTimestampts(new TimestampExtractor[(Long, Int, Double, String)] { + override def extractTimestamp(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 + + override def extractWatermark(element: (Long, Int, Double, String), currentTimestamp: Long): Long = element._1 - 1000 + + override def getCurrentWatermark: Long = Long.MinValue +}) +{% endhighlight %} +</div> +</div> + +If you know that timestamps of events are always ascending, i.e., elements arrive in order, you can use +the `AscendingTimestampExtractor`, and the system generates watermarks automatically: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +DataStream<Tuple4<Long,Integer,Double,String>> stream = //... +stream.assignTimestamps(new AscendingTimestampExtractor<Tuple4<Long,Integer,Double,String>>{ + @Override + public long extractAscendingTimestamp(Tuple4<Long,Integer,Double,String> element, long currentTimestamp) { + return element.f0; + } +}); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +stream.extractAscendingTimestamp(record => record._1) +{% endhighlight %} +</div> +</div> + +In order to write a program with ingestion time semantics, you need to +set `env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)`. You can think of this setting as a +shortcut for writing a `TimestampExtractor` which assignes timestamps to events at the sources +based on the current source wall-clock time. Flink injects this timestamp extractor automatically. + + +### Windows on Keyed Data Streams + +Flink offers a variety of methods for defining windows on a `KeyedStream`. All of these group elements *per key*, +i.e., each window will contain elements with the same key value. + +#### Basic Window Constructs + +Flink offers a general window mechanism that provides flexibility, as well as a number of pre-defined windows +for common use cases. See first if your use case can be served by the pre-defined windows below before moving +to defining your own windows. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Tumbling time window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Defines a window of 5 seconds, that "tumbles". This means that elements are + grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. + The notion of time is specified by the selected TimeCharacteristic (see <a href="#working-with-time">time</a>). + {% highlight java %} +keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS)); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Sliding time window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are + grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than + one window (since windows overlap by at most 4 seconds) + The notion of time is specified by the selected TimeCharacteristic (see <a href="#working-with-time">time</a>). + {% highlight java %} +keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Tumbling count window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Defines a window of 1000 elements, that "tumbles". This means that elements are + grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, + and every element belongs to exactly one window. + {% highlight java %} +keyedStream.countWindow(1000); + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Sliding count window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are + grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, + and every element can belong to more than one window (as windows overlap by at most 900 elements). + {% highlight java %} +keyedStream.countWindow(1000, 100) + {% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> + +<div data-lang="scala" markdown="1"> + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Tumbling time window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Defines a window of 5 seconds, that "tumbles". This means that elements are + grouped according to their timestamp in groups of 5 second duration, and every element belongs to exactly one window. + The notion of time is specified by the selected TimeCharacteristic (see <a href="#working-with-time">time</a>). + {% highlight scala %} +keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS)) + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Sliding time window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Defines a window of 5 seconds, that "slides" by 1 seconds. This means that elements are + grouped according to their timestamp in groups of 5 second duration, and elements can belong to more than + one window (since windows overlap by at most 4 seconds) + The notion of time is specified by the selected TimeCharacteristic (see <a href="#working-with-time">time</a>). + {% highlight scala %} +keyedStream.timeWindow(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Tumbling count window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Defines a window of 1000 elements, that "tumbles". This means that elements are + grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, + and every element belongs to exactly one window. + {% highlight scala %} +keyedStream.countWindow(1000) + {% endhighlight %} + </p> + </td> + </tr> + <tr> + <td><strong>Sliding count window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Defines a window of 1000 elements, that "slides" every 100 elements. This means that elements are + grouped according to their arrival time (equivalent to processing time) in groups of 1000 elements, + and every element can belong to more than one window (as windows overlap by at most 900 elements). + {% highlight scala %} +keyedStream.countWindow(1000, 100) + {% endhighlight %} + </p> + </td> + </tr> + </tbody> +</table> + +</div> +</div> + +#### Advanced Window Constructs + +The general mechanism can define more powerful windows at the cost of more verbose syntax. For example, +below is a window definition where windows hold elements of the last 5 seconds and slides every 1 second, +but the execution of the window function is triggered when 100 elements have been added to the +window, and every time execution is triggered, 10 elements are retained in the window: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +keyedStream + .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) + .trigger(CountTrigger.of(100)) + .evictor(CountEvictor.of(10)); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +keyedStream + .window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS)) + .trigger(CountTrigger.of(100)) + .evictor(CountEvictor.of(10)) +{% endhighlight %} +</div> +</div> + +The general recipe for building a custom window is to specify (1) a `WindowAssigner`, (2) a `Trigger` (optionally), +and (3) an `Evictor` (optionally). + +The `WindowAssigner` defines how incoming elements are assigned to windows. A window is a logical group of elements +that has a begin-value, and an end-value corresponding to a begin-time and end-time. Elements with timestamp (according +to some notion of time described above within these values are part of the window). + +For example, the `SlidingTimeWindows` +assigner in the code above defines a window of size 5 seconds, and a slide of 1 second. Assume that +time starts from 0 and is measured in milliseconds. Then, we have 6 windows +that overlap: [0,5000], [1000,6000], [2000,7000], [3000, 8000], [4000, 9000], and [5000, 10000]. Each incoming +element is assigned to the windows according to its timestamp. For example, an element with timestamp 2000 will be +assigned to the first three windows. Flink comes bundled with window assigners that cover the most common use cases. You can write your +own window types by extending the `WindowAssigner` class. + +<div class="codetabs" markdown="1"> + +<div data-lang="java" markdown="1"> +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Global window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + All incoming elements of a given key are assigned to the same window. + The window does not contain a default trigger, hence it will never be triggered + if a trigger is not explicitly specified. + </p> + {% highlight java %} +stream.window(GlobalWindows.create()); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Tumbling time windows</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Incoming elements are assigned to a window of a certain size (1 second below) based on + their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. + The notion of time is picked from the specified TimeCharacteristic (see <a href="#working-with-time">time</a>). + The window comes with a default trigger. For event/ingestion time, a window is triggered when a + watermark with value higher than its end-value is received, whereas for processing time + when the current processing time exceeds its current end value. + </p> + {% highlight java %} +stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))); + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Sliding time windows</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Incoming elements are assigned to a window of a certain size (5 seconds below) based on + their timestamp. Windows "slide" by the provided value (1 second in the example), and hence + overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a + watermark with value higher than its end-value is received, whereas for processing time + when the current processing time exceeds its current end value. + </p> + {% highlight java %} +stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))); + {% endhighlight %} + </td> + </tr> + </tbody> +</table> +</div> + +<div data-lang="scala" markdown="1"> +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Global window</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + All incoming elements of a given key are assigned to the same window. + The window does not contain a default trigger, hence it will never be triggered + if a trigger is not explicitly specified. + </p> + {% highlight scala %} +stream.window(GlobalWindows.create) + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Tumbling time windows</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Incoming elements are assigned to a window of a certain size (1 second below) based on + their timestamp. Windows do not overlap, i.e., each element is assigned to exactly one window. + The notion of time is specified by the selected TimeCharacteristic (see <a href="#working-with-time">time</a>). + The window comes with a default trigger. For event/ingestion time, a window is triggered when a + watermark with value higher than its end-value is received, whereas for processing time + when the current processing time exceeds its current end value. + </p> + {% highlight scala %} +stream.window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + {% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Sliding time windows</strong><br>KeyedStream → WindowedStream</td> + <td> + <p> + Incoming elements are assigned to a window of a certain size (5 seconds below) based on + their timestamp. Windows "slide" by the provided value (1 second in the example), and hence + overlap. The window comes with a default trigger. For event/ingestion time, a window is triggered when a + watermark with value higher than its end-value is received, whereas for processing time + when the current processing time exceeds its current end value. + </p> + {% highlight scala %} +stream.window(SlidingTimeWindows.of(Time.of(5, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) + {% endhighlight %} + </td> + </tr> + </tbody> +</table> +</div> + +</div> + +The `Trigger` specifies when the function that comes after the window clause (e.g., `sum`, `count`) is evaluated ("fires") +for each window. If a trigger is not specified, a default trigger for each window type is used (that is part of the +definition of the `WindowAssigner`). Flink comes bundled with a set of triggers if the ones that windows use by +default do not fit the application. You can write your own trigger by implementing the `Trigger` interface. Note that +specifying a trigger will override the default trigger of the window assigner. + +<div class="codetabs" markdown="1"> + +<div data-lang="java" markdown="1"> +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><strong>Processing time trigger</strong></td> + <td> + <p> + A window is fired when the current processing time exceeds its end-value. + The elements on the triggered window are henceforth discarded. + </p> +{% highlight java %} +windowedStream.trigger(ProcessingTimeTrigger.create()); +{% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Watermark trigger</strong></td> + <td> + <p> + A window is fired when a watermark with value that exceeds the window's end-value has been received. + The elements on the triggered window are henceforth discarded. + </p> +{% highlight java %} +windowedStream.trigger(EventTimeTrigger.create()); +{% endhighlight %} + </td> + </tr> + <tr> + <td><strong>Continuous processing time trigger</strong></td> + <td> + <p> + A window is periodically considered for being fired (every 5 seconds in the example). + The window is actually fired only when the current processing time exceeds its end-value. + The elements on the triggered window are retained. + </p> +{% highlight java %} +windowedStream.trigger(ContinuousProcessingTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))); +{% endhigh
<TRUNCATED>