[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019490#comment-16019490
 ] 

ASF GitHub Bot commented on FLINK-5886:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3838#discussion_r117722927
  
    --- Diff: docs/dev/stream/python.md ---
    @@ -0,0 +1,649 @@
    +---
    +title: "Python Programming Guide (Streaming)"
    +is_beta: true
    +nav-title: Python API
    +nav-parent_id: streaming
    +nav-pos: 63
    +---
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +
    +Analysis streaming programs in Flink are regular programs that implement 
transformations on
    +streaming data sets (e.g., filtering, mapping, joining, grouping). The 
streaming data sets are initially
    +created from certain sources (e.g., by reading from Apache Kafka, or 
reading files, or from collections).
    +Results are returned via sinks, which may for example write the data to 
(distributed) files, or to
    +standard output (for example the command line terminal). Flink streaming 
programs run in a variety
    +of contexts, standalone, or embedded in other programs. The execution can 
happen in a local JVM, or
    +on clusters of many machines.
    +
    +In order to create your own Flink streaming program, we encourage you to 
start with the
    +[program skeleton](#program-skeleton) and gradually add your own
    +[transformations](#transformations). The remaining sections act as 
references for additional
    +operations and advanced features.
    +
    +* This will be replaced by the TOC
    +{:toc}
    +
    +Jython Framework
    +---------------
    +Flink Python streaming API uses Jython framework (see 
<http://www.jython.org/archive/21/docs/whatis.html>)
    +to drive the execution of a given script. The Python streaming layer, is 
actually a thin wrapper layer for the
    +existing Java streaming APIs.
    +
    +#### Constraints
    +There are two main constraints for using Jython:
    +
    +* The latest Python supported version is 2.7
    +* It is not straightforward to use Python C extensions
    +
    +Streaming Program Example
    +-------------------------
    +The following streaming program is a complete, working example of 
WordCount. You can copy &amp; paste the code
    +to run it locally (see notes later in this section). It counts the number 
of each word (case insensitive)
    +in a stream of sentences, on a window size of 50 milliseconds and prints 
the results into the standard output.
    +
    +{% highlight python %}
    +from org.apache.flink.streaming.api.functions.source import SourceFunction
    +from org.apache.flink.api.common.functions import FlatMapFunction, 
ReduceFunction
    +from org.apache.flink.api.java.functions import KeySelector
    +from org.apache.flink.python.api.jython import 
PythonStreamExecutionEnvironment
    +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds
    +
    +
    +class Generator(SourceFunction):
    +    def __init__(self, num_iters):
    +        self._running = True
    +        self._num_iters = num_iters
    +
    +    def run(self, ctx):
    +        counter = 0
    +        while self._running and counter < self._num_iters:
    +            ctx.collect('Hello World')
    +            counter += 1
    +
    +    def cancel(self):
    +        self._running = False
    +
    +
    +class Tokenizer(FlatMapFunction):
    +    def flatMap(self, value, collector):
    +        for word in value.lower().split():
    +            collector.collect((1, word))
    +
    +
    +class Selector(KeySelector):
    +    def getKey(self, input):
    +        return input[1]
    +
    +
    +class Sum(ReduceFunction):
    +    def reduce(self, input1, input2):
    +        count1, word1 = input1
    +        count2, word2 = input2
    +        return (count1 + count2, word1)
    +
    +def main():
    +    env = PythonStreamExecutionEnvironment.get_execution_environment()
    +    env.create_python_source(Generator(num_iters=1000)) \
    +        .flat_map(Tokenizer()) \
    +        .key_by(Selector()) \
    +        .time_window(milliseconds(50)) \
    +        .reduce(Sum()) \
    +        .print()
    +    env.execute()
    +
    +
    +if __name__ == '__main__':
    +    main()
    +{% endhighlight %}
    +
    +**Notes:**
    +
    +- If execution is done on a local cluster, you may replace the last line 
in the `main()` function
    +  with **`env.execute(True)`**
    +- Execution on a multi-node cluster requires a shared medium storage, 
which needs to be configured (.e.g HDFS)
    +  upfront.
    +- The output from of the given script is directed to the standard output. 
Consequently, the output
    +  is written to the corresponding worker `.out` filed. If the script is 
executed inside the IntelliJ IDE,
    +  then the output will be displayed in the console tab.
    +
    +{% top %}
    +
    +Program Skeleton
    +----------------
    +As we already saw in the example, Flink streaming programs look like 
regular Python programs.
    +Each program consists of the same basic parts:
    +
    +1. A `main()` function definition, without arguments - the program entry 
point,
    +2. Obtain an `Environment`,
    +3. Load/create the initial data,
    +4. Specify transformations on this data,
    +5. Specify where to put the results of your computations, and
    +5. Execute your program.
    +
    +We will now give an overview of each of those steps but please refer to 
the respective sections for
    +more details.
    +
    +The `main()` function is a must and it is used by Flink execution layer to 
run the
    +given Python streaming program.
    +
    +The `Environment` is the basis for all Flink programs. You can
    +obtain one using these static methods on class 
`PythonStreamExecutionEnvironment`:
    +
    +{% highlight python %}
    +get_execution_environment()
    +{% endhighlight %}
    +
    +For specifying data sources the streaming execution environment has 
several methods.
    +To just read a text file as a sequence of lines, you can use:
    +
    +{% highlight python %}
    +env = get_execution_environment()
    +text = env.read_text_file("file:///path/to/file")
    +{% endhighlight %}
    +
    +This will give you a DataStream on which you can then apply 
transformations. For
    +more information on data sources and input formats, please refer to
    +[Data Sources](#data-sources).
    +
    +Once you have a DataStream you can apply transformations to create a new
    +DataStream which you can then write to a file, transform again, or
    +combine with other DataStreams. You apply transformations by calling
    +methods on DataStream with your own custom transformation function. For 
example,
    +a map transformation looks like this:
    +
    +{% highlight python %}
    +class Doubler(MapFunction):
    +    def map(self, value):
    +        return value * 2
    +
    +data.map(Doubler())
    +{% endhighlight %}
    +
    +This will create a new DataStream by doubling every value in the original 
DataStream.
    +For more information and a list of all the transformations,
    +please refer to [Transformations](#transformations).
    +
    +Once you have a DataStream that needs to be written to disk you can call 
one
    +of these methods on DataStream:
    +
    +{% highlight python %}
    +data.write_as_text("<file-path>")
    +data.write_as_text("<file-path>", mode=WriteMode.OVERWRITE)
    +data.print()
    +{% endhighlight %}
    +
    +The last method is only useful for developing/debugging on a local machine,
    +it will output the contents of the DataSet to standard output. (Note that 
in
    +a cluster, the result goes to the standard out stream of the cluster nodes 
and ends
    +up in the *.out* files of the workers).
    +The first two do as the name suggests.
    +Please refer to [Data Sinks](#data-sinks) for more information on writing 
to files.
    +
    +Once you specified the complete program you need to call `execute` on
    +the `Environment`. This will either execute on your local machine or 
submit your program
    +for execution on a cluster, depending on how Flink was started. You can 
force
    +a local execution by using `execute(True)`.
    +
    +{% top %}
    +
    +Project setup
    +---------------
    +
    +Apart from setting up Flink, no additional work is required. Using Jython 
to execute the Python
    +script, means that no external packages are needed and the program is 
executed as if it was a jar file.
    +
    +The Python API was tested on Linux/OSX systems.
    +
    +{% top %}
    +
    +Lazy Evaluation
    +---------------
    +
    +All Flink programs are executed lazily: When the program's main method is 
executed, the data loading
    +and transformations do not happen directly. Rather, each operation is 
created and added to the
    +program's plan. The operations are actually executed when one of the 
`execute()` methods is invoked
    +on the Environment object. Whether the program is executed locally or on a 
cluster depends
    +on the environment of the program.
    +
    +The lazy evaluation lets you construct sophisticated programs that Flink 
executes as one
    +holistically planned unit.
    +
    +{% top %}
    +
    +Transformations
    +---------------
    +
    +Data transformations transform one or more DataStreams into a new 
DataStream. Programs can combine
    +multiple transformations into sophisticated assemblies.
    +
    +This section gives a brief overview of the available transformations. The 
[transformations
    +documentation](dataset_transformations.html) has a full description of all 
transformations with
    +examples.
    +
    +<br />
    +
    +<table class="table table-bordered">
    +  <thead>
    +    <tr>
    +      <th class="text-left" style="width: 20%">Transformation</th>
    +      <th class="text-center">Description</th>
    +    </tr>
    +  </thead>
    +
    +  <tbody>
    +    <tr>
    +      <td><strong>Map</strong><br>PythonDataStream &rarr; 
PythonDataStream</td>
    +      <td>
    +        <p>Takes one element and produces one element.</p>
    +{% highlight python %}
    +class Doubler(MapFunction):
    +    def map(self, value):
    +        return value * 2
    +
    +data_stream.map(Doubler())
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>FlatMap</strong><br>PythonDataStream &rarr; 
PythonDataStream</td>
    +      <td>
    +        <p>Takes one element and produces zero, one, or more elements. </p>
    +{% highlight python %}
    +class Tokenizer(FlatMapFunction):
    +    def flatMap(self, word, collector):
    +        collector.collect((1, word))
    +
    +data_stream.flat_map(Tokenizer())
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>Filter</strong><br>PythonDataStream &rarr; 
PythonDataStream</td>
    +      <td>
    +        <p>Evaluates a boolean function for each element and retains those 
for which the function
    +        returns true.</p>
    +{% highlight python %}
    +class GreaterThen1000(FilterFunction):
    +    def filter(self, value):
    +        return value > 1000
    +
    +data_stream.filter(GreaterThen1000())
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>KeyBy</strong><br>PythonDataStream &rarr; 
PythonKeyedStream</td>
    +      <td>
    +        <p>Logically partitions a stream into disjoint partitions, each 
partition containing elements of the same key.
    +        Internally, this is implemented with hash partitioning. See <a 
href="/dev/api_concepts#specifying-keys">keys</a> on how to specify keys.
    +        This transformation returns a PythonKeyedDataStream.</p>
    +    {% highlight python %}
    +class Selector(KeySelector):
    +    def getKey(self, input):
    +        return input[1]  # Key by the second element in a tuple
    +
    +data_stream.key_by(Selector()) // Key by field "someKey"
    +    {% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>Reduce</strong><br>PythonKeyedStream &rarr; 
PythonDataStream</td>
    +      <td>
    +        <p>A "rolling" reduce on a keyed data stream. Combines the current 
element with the last reduced value and
    +                        emits the new value.</p>
    +{% highlight python %}
    +class Sum(ReduceFunction):
    +    def reduce(self, input1, input2):
    +        count1, val1 = input1
    +        count2, val2 = input2
    +        return (count1 + count2, val1)
    +
    +data.reduce(Sum())
    +{% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>Window</strong><br>PythonKeyedStream &rarr; 
PythonWindowedStream</td>
    +      <td>
    +        <p>Windows can be defined on already partitioned KeyedStreams. 
Windows group the data in each
    +        key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
    +        See <a href="windows.html">windows</a> for a complete description 
of windows.
    +    {% highlight python %}
    +keyed_stream.count_window(10, 5)  # Last 10 elements, sliding (jumping) by 
5 elements
    +
    +keyed_stream.time_window(milliseconds(30))  # Last 30 milliseconds of data
    +
    +keted_stream.time_window(milliseconds(100), milliseconds(20))  # Last 100 
milliseconds of data, sliding (jumping) by 20 milliseconds
    +    {% endhighlight %}
    +        </p>
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>Window Apply</strong><br>PythonWindowedStream &rarr; 
PythonDataStream</td>
    +      <td>
    +        <p>Applies a general function to the window as a whole. Below is a 
function that manually sums
    +           the elements of a window.</p>
    +    {% highlight python %}
    +class WindowSum(WindowFunction):
    +    def apply(self, key, window, values, collector):
    +        sum = 0
    +        for value in values:
    +            sum += value[0]
    +        collector.collect((key, sum))
    +
    +windowed_stream.apply(WindowSum())
    +    {% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>Window Reduce</strong><br>PythonWindowedStream &rarr; 
PythonDataStream</td>
    +      <td>
    +        <p>Applies a functional reduce function to the window and returns 
the reduced value.</p>
    +    {% highlight python %}
    +class Sum(ReduceFunction):
    +    def reduce(self, input1, input2):
    +        count1, val1 = input1
    +        count2, val2 = input2
    +        return (count1 + count2, val1)
    +
    +windowed_stream.reduce(Sum())
    +    {% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +    <td><strong>Union</strong><br>PythonDataStream* &rarr; 
PythonDataStream</td>
    +      <td>
    +        <p>Union of two or more data streams creating a new stream 
containing all the elements from all
    +           the streams. Note: If you union a data stream with itself you 
will get each element twice
    +           in the resulting stream.</p>
    +    {% highlight python %}
    +data_stream.union(other_stream1, other_stream2, ...);
    +    {% endhighlight %}
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>Split</strong><br>PythonDataStream &rarr; 
PythonSplitStream</td>
    +      <td>
    +        <p>Split the stream into two or more streams according to some 
criterion.
    +    {% highlight python %}
    +class StreamSelector(OutputSelector):
    +    def select(self, value):
    +        return ["even"] if value % 2 == 0 else ["odd"]
    +
    +splited_stream = data_stream.split(StreamSelector())
    +    {% endhighlight %}
    +        </p>
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>Select</strong><br>SplitStream &rarr; DataStream</td>
    +      <td>
    +        <p> Select one or more streams from a split stream.
    +    {% highlight python %}
    +even_data_stream = splited_stream.select("even")
    +odd_data_stream = splited_stream.select("odd")
    +all_data_stream = splited_stream.select("even", "odd")
    +    {% endhighlight %}
    +        </p>
    +      </td>
    +    </tr>
    +
    +    <tr>
    +      <td><strong>Iterate</strong><br>PythonDataStream &rarr; 
PythonIterativeStream &rarr; PythonDataStream</td>
    +      <td>
    +        <p> Creates a "feedback" loop in the flow, by redirecting the 
output of one operator
    +            to some previous operator. This is especially useful for 
defining algorithms that
    +            continuously update a model. The following code starts with a 
stream and applies
    +            the iteration body continuously. Elements that are greater 
than 0 are sent back
    +            to the feedback channel, and the rest of the elements are 
forwarded downstream.
    +            See <a href="#iterations">iterations</a> for a complete 
description.
    +        {% highlight java %}
    +class MinusOne(MapFunction):
    +    def map(self, value):
    +        return value - 1
    +
    +class PositiveNumber(FilterFunction):
    +    def filter(self, value):
    +        return value > 0
    +
    +class LessEquelToZero(FilterFunction):
    +    def filter(self, value):
    +        return value <= 0
    +
    +iteration = initial_stream.iterate(5000)
    +iteration_body = iteration.map(MinusOne())
    +feedback = iteration_body.filter(PositiveNumber())
    +iteration.close_with(feedback)
    +output = iteration_body.filter(LessEquelToZero())
    +        {% endhighlight %}
    +        </p>
    +      </td>
    +    </tr>
    +
    +  </tbody>
    +</table>
    +
    +{% top %}
    +
    +Passing Functions to Flink
    +--------------------------
    +
    +Certain operations require user-defined functions as arguments. All the 
functions should be
    +defined as Python classes that derived from the relevant Flink function. 
User-defined functions
    +are serialized and sent over to the TaskManagers for execution.
    +
    +{% highlight python %}
    +class Filter(FilterFunction):
    +    def filter(self, value):
    +        return value > 5
    +
    +data_stream.filter(Filter())
    +{% endhighlight %}
    +
    +Rich functions (.e.g `RichFileterFunction`) enable to define (override) 
the optional operations: `open` & `close`.
    --- End diff --
    
    type: Fileter -> Filter


> Python API for streaming applications
> -------------------------------------
>
>                 Key: FLINK-5886
>                 URL: https://issues.apache.org/jira/browse/FLINK-5886
>             Project: Flink
>          Issue Type: New Feature
>          Components: Python API
>            Reporter: Zohar Mizrahi
>            Assignee: Zohar Mizrahi
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to