[ https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019484#comment-16019484 ]
ASF GitHub Bot commented on FLINK-5886: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3838#discussion_r117722682 --- Diff: docs/dev/stream/python.md --- @@ -0,0 +1,649 @@ +--- +title: "Python Programming Guide (Streaming)" +is_beta: true +nav-title: Python API +nav-parent_id: streaming +nav-pos: 63 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Analysis streaming programs in Flink are regular programs that implement transformations on +streaming data sets (e.g., filtering, mapping, joining, grouping). The streaming data sets are initially +created from certain sources (e.g., by reading from Apache Kafka, or reading files, or from collections). +Results are returned via sinks, which may for example write the data to (distributed) files, or to +standard output (for example the command line terminal). Flink streaming programs run in a variety +of contexts, standalone, or embedded in other programs. The execution can happen in a local JVM, or +on clusters of many machines. + +In order to create your own Flink streaming program, we encourage you to start with the +[program skeleton](#program-skeleton) and gradually add your own +[transformations](#transformations). The remaining sections act as references for additional +operations and advanced features. + +* This will be replaced by the TOC +{:toc} + +Jython Framework +--------------- +Flink Python streaming API uses Jython framework (see <http://www.jython.org/archive/21/docs/whatis.html>) +to drive the execution of a given script. The Python streaming layer, is actually a thin wrapper layer for the +existing Java streaming APIs. + +#### Constraints +There are two main constraints for using Jython: + +* The latest Python supported version is 2.7 +* It is not straightforward to use Python C extensions + +Streaming Program Example +------------------------- +The following streaming program is a complete, working example of WordCount. You can copy & paste the code +to run it locally (see notes later in this section). It counts the number of each word (case insensitive) +in a stream of sentences, on a window size of 50 milliseconds and prints the results into the standard output. + +{% highlight python %} +from org.apache.flink.streaming.api.functions.source import SourceFunction +from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction +from org.apache.flink.api.java.functions import KeySelector +from org.apache.flink.python.api.jython import PythonStreamExecutionEnvironment +from org.apache.flink.streaming.api.windowing.time.Time import milliseconds + + +class Generator(SourceFunction): + def __init__(self, num_iters): + self._running = True + self._num_iters = num_iters + + def run(self, ctx): + counter = 0 + while self._running and counter < self._num_iters: + ctx.collect('Hello World') + counter += 1 + + def cancel(self): + self._running = False + + +class Tokenizer(FlatMapFunction): + def flatMap(self, value, collector): + for word in value.lower().split(): + collector.collect((1, word)) + + +class Selector(KeySelector): + def getKey(self, input): + return input[1] + + +class Sum(ReduceFunction): + def reduce(self, input1, input2): + count1, word1 = input1 + count2, word2 = input2 + return (count1 + count2, word1) + +def main(): + env = PythonStreamExecutionEnvironment.get_execution_environment() + env.create_python_source(Generator(num_iters=1000)) \ + .flat_map(Tokenizer()) \ + .key_by(Selector()) \ + .time_window(milliseconds(50)) \ + .reduce(Sum()) \ + .print() + env.execute() + + +if __name__ == '__main__': + main() +{% endhighlight %} + +**Notes:** + +- If execution is done on a local cluster, you may replace the last line in the `main()` function + with **`env.execute(True)`** +- Execution on a multi-node cluster requires a shared medium storage, which needs to be configured (.e.g HDFS) + upfront. +- The output from of the given script is directed to the standard output. Consequently, the output + is written to the corresponding worker `.out` filed. If the script is executed inside the IntelliJ IDE, --- End diff -- type: filed -> file > Python API for streaming applications > ------------------------------------- > > Key: FLINK-5886 > URL: https://issues.apache.org/jira/browse/FLINK-5886 > Project: Flink > Issue Type: New Feature > Components: Python API > Reporter: Zohar Mizrahi > Assignee: Zohar Mizrahi > > A work in progress to provide python interface for Flink streaming APIs. The > core technology is based on jython and thus imposes two limitations: a. user > defined functions cannot use python extensions. b. the python version is 2.x > The branch is based on Flink release 1.2.0, as can be found here: > https://github.com/zohar-pm/flink/tree/python-streaming > In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was > setup properly (see: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html), > one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, > which in return will execute all the tests under > {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)