[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019490#comment-16019490 ]
ASF GitHub Bot commented on FLINK-5886: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r117722927 --- Diff: docs/dev/stream/python.md --- @@ -0,0 +1,649 @@ +--- +title: "Python Programming Guide (Streaming)" +is_beta: true +nav-title: Python API +nav-parent_id: streaming +nav-pos: 63 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Analysis streaming programs in Flink are regular programs that implement transformations on +streaming data sets (e.g., filtering, mapping, joining, grouping). The streaming data sets are initially +created from certain sources (e.g., by reading from Apache Kafka, or reading files, or from collections). +Results are returned via sinks, which may for example write the data to (distributed) files, or to +standard output (for example the command line terminal). Flink streaming programs run in a variety +of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or +on clusters of many machines. + +In order to create your own Flink streaming program, we encourage you to start with the +[program skeleton](#program-skeleton) and gradually add your own +[transformations](#transformations). The remaining sections act as references for additional +operations and advanced features. + +* This will be replaced by the TOC +{:toc} + +Jython Framework +--------------- +Flink Python streaming API uses Jython framework (see <http://www.jython.org/archive/21/docs/whatis.html>) +to drive the execution of a given script. The Python streaming layer, is actually a thin wrapper layer for the +existing Java streaming APIs. + +#### Constraints +There are two main constraints for using Jython: + +* The latest Python supported version is 2.7 +* It is not straightforward to use Python C extensions + +Streaming Program Example +------------------------- +The following streaming program is a complete, working example of WordCount. You can copy & paste the code +to run it locally (see notes later in this section). It counts the number of each word (case insensitive) +in a stream of sentences, on a window size of 50 milliseconds and prints the results into the standard output. + +{% highlight python %} +from org.apache.flink.streaming.api.functions.source import SourceFunction +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.python.api.jython import PythonStreamExecutionEnvironment +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + + +class Generator(SourceFunction): + def __init__(self, num_iters): + self._running = True + self._num_iters = num_iters + + def run(self, ctx): + counter = 0 + while self._running and counter < self._num_iters: + ctx.collect('Hello World') + counter += 1 + + def cancel(self): + self._running = False + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, word1 = input1 + count2, word2 = input2 + return (count1 + count2, word1) + +def main(): + env = PythonStreamExecutionEnvironment.get_execution_environment() + env.create_python_source(Generator(num_iters=1000)) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(50)) \ + .reduce(Sum()) \ + .print() + env.execute() + + +if __name__ == '__main__': + main() +{% endhighlight %} + +**Notes:** + +- If execution is done on a local cluster, you may replace the last line in the `main()` function + with **`env.execute(True)`** +- Execution on a multi-node cluster requires a shared medium storage, which needs to be configured (.e.g HDFS) + upfront. +- The output from of the given script is directed to the standard output. Consequently, the output + is written to the corresponding worker `.out` filed. If the script is executed inside the IntelliJ IDE, + then the output will be displayed in the console tab. + +{% top %} + +Program Skeleton +---------------- +As we already saw in the example, Flink streaming programs look like regular Python programs. +Each program consists of the same basic parts: + +1. A `main()` function definition, without arguments - the program entry point, +2. Obtain an `Environment`, +3. Load/create the initial data, +4. Specify transformations on this data, +5. Specify where to put the results of your computations, and +5. Execute your program. + +We will now give an overview of each of those steps but please refer to the respective sections for +more details. + +The `main()` function is a must and it is used by Flink execution layer to run the +given Python streaming program. + +The `Environment` is the basis for all Flink programs. You can +obtain one using these static methods on class `PythonStreamExecutionEnvironment`: + +{% highlight python %} +get_execution_environment() +{% endhighlight %} + +For specifying data sources the streaming execution environment has several methods. +To just read a text file as a sequence of lines, you can use: + +{% highlight python %} +env = get_execution_environment() +text = env.read_text_file("file:///path/to/file") +{% endhighlight %} + +This will give you a DataStream on which you can then apply transformations. For +more information on data sources and input formats, please refer to +[Data Sources](#data-sources). + +Once you have a DataStream you can apply transformations to create a new +DataStream which you can then write to a file, transform again, or +combine with other DataStreams. You apply transformations by calling +methods on DataStream with your own custom transformation function. For example, +a map transformation looks like this: + +{% highlight python %} +class Doubler(MapFunction): + def map(self, value): + return value * 2 + +data.map(Doubler()) +{% endhighlight %} + +This will create a new DataStream by doubling every value in the original DataStream. +For more information and a list of all the transformations, +please refer to [Transformations](#transformations). + +Once you have a DataStream that needs to be written to disk you can call one +of these methods on DataStream: + +{% highlight python %} +data.write_as_text("<file-path>") +data.write_as_text("<file-path>", mode=WriteMode.OVERWRITE) +data.print() +{% endhighlight %} + +The last method is only useful for developing/debugging on a local machine, +it will output the contents of the DataSet to standard output. (Note that in +a cluster, the result goes to the standard out stream of the cluster nodes and ends +up in the *.out* files of the workers). +The first two do as the name suggests. +Please refer to [Data Sinks](#data-sinks) for more information on writing to files. + +Once you specified the complete program you need to call `execute` on +the `Environment`. This will either execute on your local machine or submit your program +for execution on a cluster, depending on how Flink was started. You can force +a local execution by using `execute(True)`. + +{% top %} + +Project setup +--------------- + +Apart from setting up Flink, no additional work is required. Using Jython to execute the Python +script, means that no external packages are needed and the program is executed as if it was a jar file. + +The Python API was tested on Linux/OSX systems. + +{% top %} + +Lazy Evaluation +--------------- + +All Flink programs are executed lazily: When the program's main method is executed, the data loading +and transformations do not happen directly. Rather, each operation is created and added to the +program's plan. The operations are actually executed when one of the `execute()` methods is invoked +on the Environment object. Whether the program is executed locally or on a cluster depends +on the environment of the program. + +The lazy evaluation lets you construct sophisticated programs that Flink executes as one +holistically planned unit. + +{% top %} + +Transformations +--------------- + +Data transformations transform one or more DataStreams into a new DataStream. Programs can combine +multiple transformations into sophisticated assemblies. + +This section gives a brief overview of the available transformations. The [transformations +documentation](dataset_transformations.html) has a full description of all transformations with +examples. + +<br /> + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Transformation</th> + <th class="text-center">Description</th> + </tr> + </thead> + + <tbody> + <tr> + <td><strong>Map</strong><br>PythonDataStream → PythonDataStream</td> + <td> + <p>Takes one element and produces one element.</p> +{% highlight python %} +class Doubler(MapFunction): + def map(self, value): + return value * 2 + +data_stream.map(Doubler()) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>FlatMap</strong><br>PythonDataStream → PythonDataStream</td> + <td> + <p>Takes one element and produces zero, one, or more elements. </p> +{% highlight python %} +class Tokenizer(FlatMapFunction): + def flatMap(self, word, collector): + collector.collect((1, word)) + +data_stream.flat_map(Tokenizer()) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Filter</strong><br>PythonDataStream → PythonDataStream</td> + <td> + <p>Evaluates a boolean function for each element and retains those for which the function + returns true.</p> +{% highlight python %} +class GreaterThen1000(FilterFunction): + def filter(self, value): + return value > 1000 + +data_stream.filter(GreaterThen1000()) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>KeyBy</strong><br>PythonDataStream → PythonKeyedStream</td> + <td> + <p>Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. + Internally, this is implemented with hash partitioning. See <a href="/dev/api_concepts#specifying-keys">keys</a> on how to specify keys. + This transformation returns a PythonKeyedDataStream.</p> + {% highlight python %} +class Selector(KeySelector): + def getKey(self, input): + return input[1] # Key by the second element in a tuple + +data_stream.key_by(Selector()) // Key by field "someKey" + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Reduce</strong><br>PythonKeyedStream → PythonDataStream</td> + <td> + <p>A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and + emits the new value.</p> +{% highlight python %} +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + +data.reduce(Sum()) +{% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Window</strong><br>PythonKeyedStream → PythonWindowedStream</td> + <td> + <p>Windows can be defined on already partitioned KeyedStreams. Windows group the data in each + key according to some characteristic (e.g., the data that arrived within the last 5 seconds). + See <a href="windows.html">windows</a> for a complete description of windows. + {% highlight python %} +keyed_stream.count_window(10, 5) # Last 10 elements, sliding (jumping) by 5 elements + +keyed_stream.time_window(milliseconds(30)) # Last 30 milliseconds of data + +keted_stream.time_window(milliseconds(100), milliseconds(20)) # Last 100 milliseconds of data, sliding (jumping) by 20 milliseconds + {% endhighlight %} + </p> + </td> + </tr> + + <tr> + <td><strong>Window Apply</strong><br>PythonWindowedStream → PythonDataStream</td> + <td> + <p>Applies a general function to the window as a whole. Below is a function that manually sums + the elements of a window.</p> + {% highlight python %} +class WindowSum(WindowFunction): + def apply(self, key, window, values, collector): + sum = 0 + for value in values: + sum += value[0] + collector.collect((key, sum)) + +windowed_stream.apply(WindowSum()) + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Window Reduce</strong><br>PythonWindowedStream → PythonDataStream</td> + <td> + <p>Applies a functional reduce function to the window and returns the reduced value.</p> + {% highlight python %} +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, val1 = input1 + count2, val2 = input2 + return (count1 + count2, val1) + +windowed_stream.reduce(Sum()) + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Union</strong><br>PythonDataStream* → PythonDataStream</td> + <td> + <p>Union of two or more data streams creating a new stream containing all the elements from all + the streams. Note: If you union a data stream with itself you will get each element twice + in the resulting stream.</p> + {% highlight python %} +data_stream.union(other_stream1, other_stream2, ...); + {% endhighlight %} + </td> + </tr> + + <tr> + <td><strong>Split</strong><br>PythonDataStream → PythonSplitStream</td> + <td> + <p>Split the stream into two or more streams according to some criterion. + {% highlight python %} +class StreamSelector(OutputSelector): + def select(self, value): + return ["even"] if value % 2 == 0 else ["odd"] + +splited_stream = data_stream.split(StreamSelector()) + {% endhighlight %} + </p> + </td> + </tr> + + <tr> + <td><strong>Select</strong><br>SplitStream → DataStream</td> + <td> + <p> Select one or more streams from a split stream. + {% highlight python %} +even_data_stream = splited_stream.select("even") +odd_data_stream = splited_stream.select("odd") +all_data_stream = splited_stream.select("even", "odd") + {% endhighlight %} + </p> + </td> + </tr> + + <tr> + <td><strong>Iterate</strong><br>PythonDataStream → PythonIterativeStream → PythonDataStream</td> + <td> + <p> Creates a "feedback" loop in the flow, by redirecting the output of one operator + to some previous operator. This is especially useful for defining algorithms that + continuously update a model. The following code starts with a stream and applies + the iteration body continuously. Elements that are greater than 0 are sent back + to the feedback channel, and the rest of the elements are forwarded downstream. + See <a href="#iterations">iterations</a> for a complete description. + {% highlight java %} +class MinusOne(MapFunction): + def map(self, value): + return value - 1 + +class PositiveNumber(FilterFunction): + def filter(self, value): + return value > 0 + +class LessEquelToZero(FilterFunction): + def filter(self, value): + return value <= 0 + +iteration = initial_stream.iterate(5000) +iteration_body = iteration.map(MinusOne()) +feedback = iteration_body.filter(PositiveNumber()) +iteration.close_with(feedback) +output = iteration_body.filter(LessEquelToZero()) + {% endhighlight %} + </p> + </td> + </tr> + + </tbody> +</table> + +{% top %} + +Passing Functions to Flink +-------------------------- + +Certain operations require user-defined functions as arguments. All the functions should be +defined as Python classes that derived from the relevant Flink function. User-defined functions +are serialized and sent over to the TaskManagers for execution. + +{% highlight python %} +class Filter(FilterFunction): + def filter(self, value): + return value > 5 + +data_stream.filter(Filter()) +{% endhighlight %} + +Rich functions (.e.g `RichFileterFunction`) enable to define (override) the optional operations: `open` & `close`. --- End diff -- type: Fileter -> Filter > Python API for streaming applications > ------------------------------------- > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API > Reporter: Zohar Mizrahi > Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)