[FLINK-671] Python API

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d182daa1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d182daa1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d182daa1

Branch: refs/heads/master
Commit: d182daa19eeb1d3877821d4a8b9a37311678a12f
Parents: af9248c
Author: zentol <s.mo...@web.de>
Authored: Tue Mar 3 20:53:14 2015 +0100
Committer: zentol <s.mo...@web.de>
Committed: Tue Apr 21 13:37:29 2015 +0200

----------------------------------------------------------------------
 docs/_includes/sidenav.html                     |    1 +
 docs/dataset_transformations.md                 |  280 ++++-
 docs/python_programming_guide.md                |  610 ++++++++++
 flink-dist/pom.xml                              |    6 +
 flink-dist/src/main/assemblies/bin.xml          |    7 +-
 flink-dist/src/main/flink-bin/bin/flink         |    1 +
 flink-dist/src/main/flink-bin/bin/pyflink2.sh   |   25 +
 flink-dist/src/main/flink-bin/bin/pyflink3.sh   |   25 +
 .../flink-language-binding/flink-python/pom.xml |   86 ++
 .../api/java/python/PythonPlanBinder.java       |  481 ++++++++
 .../java/python/functions/PythonCoGroup.java    |   78 ++
 .../python/functions/PythonCombineIdentity.java |   79 ++
 .../python/functions/PythonMapPartition.java    |   70 ++
 .../java/python/streaming/PythonStreamer.java   |  175 +++
 .../languagebinding/api/python/__init__.py      |   17 +
 .../languagebinding/api/python/dill/__diff.py   |  247 ++++
 .../languagebinding/api/python/dill/__init__.py |   91 ++
 .../languagebinding/api/python/dill/_objects.py |  548 +++++++++
 .../languagebinding/api/python/dill/detect.py   |  240 ++++
 .../languagebinding/api/python/dill/dill.py     | 1052 ++++++++++++++++++
 .../languagebinding/api/python/dill/info.py     |   17 +
 .../languagebinding/api/python/dill/objtypes.py |   45 +
 .../languagebinding/api/python/dill/pointers.py |  140 +++
 .../languagebinding/api/python/dill/source.py   | 1028 +++++++++++++++++
 .../languagebinding/api/python/dill/temp.py     |  254 +++++
 .../languagebinding/api/python/executor.py      |   58 +
 .../api/python/flink/__init__.py                |   17 +
 .../api/python/flink/connection/Collector.py    |  161 +++
 .../api/python/flink/connection/Connection.py   |  172 +++
 .../api/python/flink/connection/Constants.py    |   31 +
 .../api/python/flink/connection/Iterator.py     |  327 ++++++
 .../api/python/flink/connection/__init__.py     |   17 +
 .../api/python/flink/connection/__init__.pyc    |  Bin 0 -> 243 bytes
 .../python/flink/example/TriangleEnumeration.py |  152 +++
 .../api/python/flink/example/WordCount.py       |   61 +
 .../api/python/flink/example/__init__.py        |   17 +
 .../python/flink/functions/CoGroupFunction.py   |   53 +
 .../api/python/flink/functions/CrossFunction.py |   34 +
 .../python/flink/functions/FilterFunction.py    |   38 +
 .../python/flink/functions/FlatMapFunction.py   |   44 +
 .../api/python/flink/functions/Function.py      |   92 ++
 .../flink/functions/GroupReduceFunction.py      |  127 +++
 .../api/python/flink/functions/JoinFunction.py  |   33 +
 .../api/python/flink/functions/MapFunction.py   |   36 +
 .../flink/functions/MapPartitionFunction.py     |   34 +
 .../python/flink/functions/ReduceFunction.py    |  123 ++
 .../python/flink/functions/RuntimeContext.py    |   30 +
 .../api/python/flink/functions/__init__.py      |   17 +
 .../api/python/flink/plan/Constants.py          |  106 ++
 .../api/python/flink/plan/DataSet.py            |  907 +++++++++++++++
 .../api/python/flink/plan/Environment.py        |  339 ++++++
 .../api/python/flink/plan/__init__.py           |   17 +
 .../api/python/flink/utilities/__init__.py      |   36 +
 .../flink/languagebinding/api/python/setup.py   |   33 +
 .../api/java/python/PythonPlanBinderTest.java   |   89 ++
 .../api/python/flink/test/data_csv              |    2 +
 .../api/python/flink/test/data_text             |    2 +
 .../api/python/flink/test/test_csv.py           |   30 +
 .../api/python/flink/test/test_main.py          |  264 +++++
 .../api/python/flink/test/test_text.py          |   30 +
 .../python/flink/test/test_type_deduction.py    |   63 ++
 .../api/python/flink/test/test_types.py         |   70 ++
 flink-staging/flink-language-binding/pom.xml    |    1 +
 pom.xml                                         |    5 +-
 64 files changed, 9267 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/docs/_includes/sidenav.html
----------------------------------------------------------------------
diff --git a/docs/_includes/sidenav.html b/docs/_includes/sidenav.html
index 216587c..f3c692d 100644
--- a/docs/_includes/sidenav.html
+++ b/docs/_includes/sidenav.html
@@ -41,6 +41,7 @@ under the License.
   <li><div class="sidenav-item"><a href="{{ site.baseurl 
}}/spargel_guide.html">Spargel Graph API</a></div></li>
   <li><div class="sidenav-item"><a href="{{ site.baseurl 
}}/gelly_guide.html">Gelly Graph API</a> <small>Beta</small></div></li>
   <li><div class="sidenav-item"><a href="{{ site.baseurl }}/table.html">Table 
API - Relational Queries</a> <small>Beta</small></div></li>
+  <li><div class="sidenav-item"><a href="python_programming_guide.html">Python 
API</a></div></li>
   <li><div class="sidenav-item"><a href="{{ site.baseurl 
}}/hadoop_compatibility.html">Hadoop Compatibility</a> 
<small>Beta</small></div></li>
     <li><div class="sidenav-item-bottom"><a href="{{ site.baseurl 
}}/flink_on_tez_guide.html">Running Flink on Tez</a> 
<small>Beta</small></div></li>
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index dba0295..ce48ca1 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -61,6 +61,13 @@ val intSums = intPairs.map { pair => pair._1 + pair._2 }
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ intSums = intPairs.map(lambda x: sum(x), INT)
+~~~
+
+</div>
 </div>
 
 ### FlatMap
@@ -98,6 +105,13 @@ val words = textLines.flatMap { _.split(" ") }
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ words = lines.flat_map(lambda x,c: [line.split() for line in x], STRING)
+~~~
+
+</div>
 </div>
 
 ### MapPartition
@@ -139,6 +153,13 @@ val counts = texLines.mapPartition { in => Some(in.size) }
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ counts = lines.map_partition(lambda x,c: [sum(1 for _ in x)], INT)
+~~~
+
+</div>
 </div>
 
 ### Filter
@@ -173,12 +194,19 @@ val naturalNumbers = intNumbers.filter { _ > 0 }
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ naturalNumbers = intNumbers.filter(lambda x: x > 0)
+~~~
+
+</div>
 </div>
 
 **IMPORTANT:** The system assumes that the function does not modify the 
elements on which the predicate is applied. Violating this assumption
 can lead to incorrect results.
 
-### Project (Tuple DataSets only) (Java API Only)
+### Project (Tuple DataSets only) (Java/Python API Only)
 
 The Project transformation removes or moves Tuple fields of a Tuple DataSet.
 The `project(int...)` method selects Tuple fields that should be retained by 
their index and defines their order in the output Tuple.
@@ -187,6 +215,9 @@ Projections do not require the definition of a user 
function.
 
 The following code shows different ways to apply a Project transformation on a 
DataSet:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+
 ~~~java
 DataSet<Tuple3<Integer, Double, String>> in = // [...]
 // converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
@@ -220,6 +251,23 @@ be used for grouping can be done in many ways:
 
 Please look at the reduce examples to see how the grouping keys are specified.
 
+</div>
+<div data-lang="python" markdown="1">
+
+~~~python
+out = in.project(2,0);
+~~~
+
+### Transformations on Grouped DataSet
+
+The reduce operations can operate on grouped data sets. Specifying the key to
+be used for grouping can be done using one or more field position keys (Tuple 
DataSet only).
+
+Please look at the reduce examples to see how the grouping keys are specified.
+
+</div>
+</div>
+
 ### Reduce on Grouped DataSet
 
 A Reduce transformation that is applied on a grouped DataSet reduces each 
group to a single
@@ -281,6 +329,12 @@ val wordCounts = words.groupBy { _.word } reduce {
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+</div>
 </div>
 
 #### Reduce on DataSet Grouped by Field Position Keys (Tuple DataSets only)
@@ -323,6 +377,13 @@ val reducedTuples = tuples.groupBy("a", "b").reduce { ... }
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ reducedTuples = tuples.group_by(0, 1).reduce( ... )
+~~~
+
+</div>
 </div>
 
 ### GroupReduce on Grouped DataSet
@@ -387,6 +448,22 @@ Works analogous to grouping by Case Class fields in 
*Reduce* transformations.
 
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class DistinctReduce(GroupReduceFunction):
+   def reduce(self, iterator, collector):
+     dic = dict()
+     for value in iterator:
+       dic[value[1]] = 1
+     for key in dic.keys():
+       collector.collect(key)
+
+ output = data.group_by(0).reduce_group(DistinctReduce(), STRING)
+~~~
+
+
+</div>
 </div>
 
 #### GroupReduce on DataSet Grouped by KeySelector Function
@@ -451,6 +528,22 @@ val output = input.groupBy(0).sortGroup(1, 
Order.ASCENDING).reduceGroup {
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class DistinctReduce(GroupReduceFunction):
+   def reduce(self, iterator, collector):
+     dic = dict()
+     for value in iterator:
+       dic[value[1]] = 1
+     for key in dic.keys():
+       collector.collect(key)
+
+ output = data.group_by(0).sort_group(1, 
Order.ASCENDING).reduce_group(DistinctReduce(), STRING)
+~~~
+
+
+</div>
 </div>
 
 **Note:** A GroupSort often comes for free if the grouping is established 
using a sort-based execution strategy of an operator before the reduce 
operation.
@@ -537,6 +630,24 @@ class MyCombinableGroupReducer
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class GroupReduce(GroupReduceFunction):
+   def reduce(self, iterator, collector):
+     key, int_sum, float_sum = iterator.next()
+     for value in iterator:
+       int_sum += value[1]
+       float_sum += value[2]
+     collector.collect((key, int_sum, float_sum))
+   # in some cases combine() calls can simply be forwarded to reduce().
+   def combine(self, iterator, collector):
+     return self.reduce(iterator, collector)
+
+data.reduce_group(GroupReduce(), (STRING, INT, FLOAT), combinable=True)
+~~~
+
+</div>
 </div>
 
 ### GroupCombine on a Grouped DataSet
@@ -664,6 +775,13 @@ val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
 </div>
 
 To apply multiple aggregations on a DataSet it is necessary to use the 
`.and()` function after the first aggregate, that means `.aggregate(SUM, 
0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the 
original DataSet.
@@ -704,6 +822,14 @@ val sum = intNumbers.reduce (_ + _)
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ intNumbers = env.from_elements(1,2,3)
+ sum = intNumbers.reduce(lambda x,y: x + y)
+~~~
+
+</div>
 </div>
 
 Reducing a full DataSet using the Reduce transformation implies that the final 
Reduce operation cannot be done in parallel. However, a reduce function is 
automatically combinable such that a Reduce transformation does not limit 
scalability for most use cases.
@@ -733,6 +859,13 @@ val output = input.reduceGroup(new MyGroupReducer())
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ output = data.reduce_group(MyGroupReducer(), ... )
+~~~
+
+</div>
 </div>
 
 **Note:** A GroupReduce transformation on a full DataSet cannot be done in 
parallel if the
@@ -779,6 +912,13 @@ val output = input.aggregate(SUM, 0).and(MIN, 2)
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
 </div>
 
 **Note:** Extending the set of supported aggregation functions is on our 
roadmap.
@@ -825,6 +965,13 @@ val result = input1.join(input2).where(0).equalTo(1)
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ result = input1.join(input2).where(0).equal_to(1)
+~~~
+
+</div>
 </div>
 
 #### Join with Join-Function
@@ -887,6 +1034,20 @@ val weightedRatings = 
ratings.join(weights).where("category").equalTo(0) {
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class PointWeighter(JoinFunction):
+   def join(self, rating, weight):
+     return (rating[0], rating[1] * weight[1]) 
+       if value1[3]:
+
+ weightedRatings = 
+   ratings.join(weights).where(0).equal_to(0). \
+   with(new PointWeighter(), (STRING, FLOAT));
+~~~
+
+</div>
 </div>
 
 #### Join with Flat-Join Function
@@ -915,7 +1076,7 @@ DataSet<Tuple2<String, Double>>
             ratings.join(weights) // [...]
 ~~~
 
-#### Join with Projection (Java Only)
+#### Join with Projection (Java/Python Only)
 
 A Join transformation can construct result tuples using a projection as shown 
here:
 
@@ -953,6 +1114,21 @@ val weightedRatings = 
ratings.join(weights).where("category").equalTo(0) {
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+#### Join with Projection (Java/Python Only)
+
+A Join transformation can construct result tuples using a projection as shown 
here:
+
+~~~python
+ result = input1.join(input2).where(0).equal_to(0) \ 
+  .project_first(0,2).project_second(1).project_first(1);
+~~~
+
+`project_first(int...)` and `project_second(int...)` select the fields of the 
first and second joined input that should be assembled into an output Tuple. 
The order of indexes defines the order of fields in the output tuple.
+The join projection works also for non-Tuple DataSets. In this case, 
`project_first()` or `project_second()` must be called without arguments to add 
a joined element to the output Tuple.
+
+</div>
 </div>
 
 #### Join with DataSet Size Hint
@@ -997,6 +1173,19 @@ val result1 = 
input1.joinWithHuge(input2).where(0).equalTo(0)
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+
+ #hint that the second DataSet is very small
+ result1 = input1.join_with_tiny(input2).where(0).equal_to(0)
+
+ #hint that the second DataSet is very large
+ result1 = input1.join_with_huge(input2).where(0).equal_to(0)
+
+~~~
+
+</div>
 </div>
 
 #### Join Algorithm Hints
@@ -1030,6 +1219,13 @@ val result1 = input1.join(input2, 
BROADCAST_HASH_FIRST).where("id").equalTo("key
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
 </div>
 
 The following hints are available:
@@ -1136,6 +1332,27 @@ val distances = coords1.cross(coords2) {
 
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class Euclid(CrossFunction):
+   def cross(self, c1, c2):
+     return (c1[0], c2[0], sqrt(pow(c1[1] - c2.[1], 2) + pow(c1[2] - c2[2], 
2)))
+
+ distances = coords1.cross(coords2).using(Euclid(), (INT,INT,FLOAT))
+~~~
+
+#### Cross with Projection
+
+A Cross transformation can also construct result tuples using a projection as 
shown here:
+
+~~~python
+result = input1.cross(input2).projectFirst(1,0).projectSecond(0,1);
+~~~
+
+The field selection in a Cross projection works the same way as in the 
projection of Join results.
+
+</div>
 </div>
 
 #### Cross with DataSet Size Hint
@@ -1180,6 +1397,18 @@ val result1 = input1.crossWithHuge(input2)
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ #hint that the second DataSet is very small
+ result1 = input1.cross_with_tiny(input2)
+
+ #hint that the second DataSet is very large
+ result1 = input1.cross_with_huge(input2)
+
+~~~
+
+</div>
 </div>
 
 ### CoGroup
@@ -1254,6 +1483,25 @@ val output = iVals.coGroup(dVals).where(0).equalTo(0) {
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ class CoGroup(CoGroupFunction):
+   def co_group(self, ivals, dvals, collector):
+     ints = dict()
+     # add all Integer values in group to set
+     for value in ivals:
+       ints[value[1]] = 1
+     # multiply each Double value with each unique Integer values of group
+     for value in dvals:
+       for i in ints.keys():
+         collector.collect(value[1] * i)
+
+
+ output = ivals.co_group(dvals).where(0).equal_to(0).using(CoGroup(), DOUBLE)
+~~~
+
+</div>
 </div>
 
 
@@ -1283,6 +1531,13 @@ val unioned = vals1.union(vals2).union(vals3)
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+ unioned = vals1.union(vals2).union(vals3)
+~~~
+
+</div>
 </div>
 
 ### Rebalance
@@ -1308,6 +1563,13 @@ val out = in.rebalance().map { ... }
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
 </div>
 
 
@@ -1336,6 +1598,13 @@ val out = in.partitionByHash(0).mapPartition { ... }
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
 </div>
 
 ### Sort Partition
@@ -1411,4 +1680,11 @@ val out3 = in.groupBy(0).sortGroup(1, 
Order.ASCENDING).first(3)
 ~~~
 
 </div>
+<div data-lang="python" markdown="1">
+
+~~~python
+Not supported.
+~~~
+
+</div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/docs/python_programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/python_programming_guide.md b/docs/python_programming_guide.md
new file mode 100644
index 0000000..660086b
--- /dev/null
+++ b/docs/python_programming_guide.md
@@ -0,0 +1,610 @@
+---
+title: "Python Programming Guide"
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+
+<a href="#top"></a>
+
+Introduction
+------------
+
+Analysis programs in Flink are regular programs that implement transformations 
on data sets
+(e.g., filtering, mapping, joining, grouping). The data sets are initially 
created from certain
+sources (e.g., by reading files, or from collections). Results are returned 
via sinks, which may for
+example write the data to (distributed) files, or to standard output (for 
example the command line
+terminal). Flink programs run in a variety of contexts, standalone, or 
embedded in other programs.
+The execution can happen in a local JVM, or on clusters of many machines.
+
+In order to create your own Flink program, we encourage you to start with the
+[program skeleton](#program-skeleton) and gradually add your own
+[transformations](#transformations). The remaining sections act as references 
for additional
+operations and advanced features.
+
+
+Example Program
+---------------
+
+The following program is a complete, working example of WordCount. You can 
copy &amp; paste the code
+to run it locally.
+
+{% highlight python %}
+from flink.plan.Environment import get_environment
+from flink.plan.Constants import INT, STRING
+from flink.functions.GroupReduceFunction import GroupReduceFunction
+
+class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+    count, word = iterator.next()
+    count += sum([x[0] for x in iterator])
+    collector.collect((count, word))
+
+if __name__ == "__main__":
+  env = get_environment()
+  data = env.from_elements("Who's there?",
+   "I think I hear them. Stand, ho! Who's there?")
+  
+  data \
+    .flat_map(lambda x, c: [(1, word) for word in x.lower().split()], (INT, 
STRING)) \
+    .group_by(1) \
+    .reduce_group(Adder(), (INT, STRING), combinable=True) \
+    .output()
+  
+  env.execute(local=True)
+{% endhighlight %}
+
+[Back to top](#top)
+
+Program Skeleton
+----------------
+
+As we already saw in the example, Flink programs look like regular python
+programs with a `if __name__ == "__main__":` block. Each program consists of 
the same basic parts:
+
+1. Obtain an `Environment`,
+2. Load/create the initial data,
+3. Specify transformations on this data,
+4. Specify where to put the results of your computations, and
+5. Execute your program.
+
+We will now give an overview of each of those steps but please refer to the 
respective sections for
+more details. 
+
+
+The `Environment` is the basis for all Flink programs. You can
+obtain one using these static methods on class `Environment`:
+
+{% highlight python %}
+get_environment()
+{% endhighlight %}
+
+For specifying data sources the execution environment has several methods
+to read from files. To just read a text file as a sequence of lines, you can 
use:
+
+{% highlight python %}
+env = get_environment()
+text = env.read_text("file:///path/to/file")
+{% endhighlight %}
+
+This will give you a DataSet on which you can then apply transformations. For
+more information on data sources and input formats, please refer to
+[Data Sources](#data-sources).
+
+Once you have a DataSet you can apply transformations to create a new
+DataSet which you can then write to a file, transform again, or
+combine with other DataSets. You apply transformations by calling
+methods on DataSet with your own custom transformation function. For example,
+a map transformation looks like this:
+
+{% highlight python %}
+data.map(lambda x: x*2, INT)
+{% endhighlight %}
+
+This will create a new DataSet by doubling every value in the original 
DataSet. 
+For more information and a list of all the transformations,
+please refer to [Transformations](#transformations).
+
+Once you have a DataSet that needs to be written to disk you can call one
+of these methods on DataSet:
+
+{% highlight python %}
+data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
+write_csv("<file-path>", WriteMode=Constants.NO_OVERWRITE, 
line_delimiter='\n', field_delimiter=',')
+output()
+{% endhighlight %}
+
+The last method is only useful for developing/debugging on a local machine,
+it will output the contents of the DataSet to standard output. (Note that in
+a cluster, the result goes to the standard out stream of the cluster nodes and 
ends
+up in the *.out* files of the workers).
+The first two do as the name suggests. 
+Please refer to [Data Sinks](#data-sinks) for more information on writing to 
files.
+
+Once you specified the complete program you need to call `execute` on
+the `Environment`. This will either execute on your local machine or submit 
your program 
+for execution on a cluster, depending on how Flink was started. You can force
+a local execution by using `execute(local=True)`.
+
+[Back to top](#top)
+
+Project setup
+---------------
+
+Apart from setting up Flink, no additional work is required. The python 
package can be found in the /resource folder of your Flink distribution. The 
flink package, along with the plan and optional packages are automatically 
distributed among the cluster via HDFS when running a job.
+
+The Python API was tested on Linux systems that have Python 2.7 or 3.4 
installed.
+
+[Back to top](#top)
+
+Lazy Evaluation
+---------------
+
+All Flink programs are executed lazily: When the program's main method is 
executed, the data loading
+and transformations do not happen directly. Rather, each operation is created 
and added to the
+program's plan. The operations are actually executed when one of the 
`execute()` methods is invoked
+on the Environment object. Whether the program is executed locally or on a 
cluster depends
+on the environment of the program.
+
+The lazy evaluation lets you construct sophisticated programs that Flink 
executes as one
+holistically planned unit.
+
+[Back to top](#top)
+
+
+Transformations
+---------------
+
+Data transformations transform one or more DataSets into a new DataSet. 
Programs can combine
+multiple transformations into sophisticated assemblies.
+
+This section gives a brief overview of the available transformations. The 
[transformations
+documentation](dataset_transformations.html) has a full description of all 
transformations with
+examples.
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Transformation</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Map</strong></td>
+      <td>
+        <p>Takes one element and produces one element.</p>
+{% highlight python %}
+data.map(lambda x: x * 2, INT)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>FlatMap</strong></td>
+      <td>
+        <p>Takes one element and produces zero, one, or more elements. </p>
+{% highlight python %}
+data.flat_map(
+  lambda x,c: [(1,word) for word in line.lower().split() for line in x],
+  (INT, STRING))
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>MapPartition</strong></td>
+      <td>
+        <p>Transforms a parallel partition in a single function call. The 
function get the partition
+        as an `Iterator` and can produce an arbitrary number of result values. 
The number of
+        elements in each partition depends on the degree-of-parallelism and 
previous operations.</p>
+{% highlight python %}
+data.map_partition(lambda x,c: [value * 2 for value in x], INT)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Filter</strong></td>
+      <td>
+        <p>Evaluates a boolean function for each element and retains those for 
which the function
+        returns true.</p>
+{% highlight python %}
+data.filter(lambda x: x > 1000)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Reduce</strong></td>
+      <td>
+        <p>Combines a group of elements into a single element by repeatedly 
combining two elements
+        into one. Reduce may be applied on a full data set, or on a grouped 
data set.</p>
+{% highlight python %}
+data.reduce(lambda x,y : x + y)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>ReduceGroup</strong></td>
+      <td>
+        <p>Combines a group of elements into one or more elements. ReduceGroup 
may be applied on a
+        full data set, or on a grouped data set.</p>
+{% highlight python %}
+class Adder(GroupReduceFunction):
+  def reduce(self, iterator, collector):
+    count, word = iterator.next()
+    count += sum([x[0] for x in iterator)      
+    collector.collect((count, word))
+
+data.reduce_group(Adder(), (INT, STRING))
+{% endhighlight %}
+      </td>
+    </tr>
+
+    </tr>
+      <td><strong>Join</strong></td>
+      <td>
+        Joins two data sets by creating all pairs of elements that are equal 
on their keys.
+        Optionally uses a JoinFunction to turn the pair of elements into a 
single element. 
+        See <a href="#specifying-keys">keys</a> on how to define join keys.
+{% highlight python %}
+# In this case tuple fields are used as keys. 
+# "0" is the join field on the first tuple
+# "1" is the join field on the second tuple.
+result = input1.join(input2).where(0).equal_to(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>CoGroup</strong></td>
+      <td>
+        <p>The two-dimensional variant of the reduce operation. Groups each 
input on one or more
+        fields and then joins the groups. The transformation function is 
called per pair of groups.
+        See <a href="#specifying-keys">keys</a> on how to define coGroup 
keys.</p>
+{% highlight python %}
+data1.co_group(data2).where(0).equal_to(1)
+{% endhighlight %}
+      </td>
+    </tr>
+
+    <tr>
+      <td><strong>Cross</strong></td>
+      <td>
+        <p>Builds the Cartesian product (cross product) of two inputs, 
creating all pairs of
+        elements. Optionally uses a CrossFunction to turn the pair of elements 
into a single
+        element.</p>
+{% highlight python %}
+result = data1.cross(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Union</strong></td>
+      <td>
+        <p>Produces the union of two data sets.</p>
+{% highlight python %}
+data.union(data2)
+{% endhighlight %}
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+[Back to Top](#top)
+
+
+Specifying Keys
+-------------
+
+Some transformations (like Join or CoGroup) require that a key is defined on
+its argument DataSets, and other transformations (Reduce, GroupReduce) allow 
that the DataSet is grouped on a key before they are
+applied.
+
+A DataSet is grouped as
+{% highlight python %}
+reduced = data \
+  .group_by(<define key here>) \
+  .reduce_group(<do something>)
+{% endhighlight %}
+
+The data model of Flink is not based on key-value pairs. Therefore,
+you do not need to physically pack the data set types into keys and
+values. Keys are "virtual": they are defined as functions over the
+actual data to guide the grouping operator.
+
+### Define keys for Tuples
+{:.no_toc}
+
+The simplest case is grouping a data set of Tuples on one or more
+fields of the Tuple:
+{% highlight python %}
+reduced = data \
+  .group_by(0) \
+  .reduce_group(<do something>)
+{% endhighlight %}
+
+The data set is grouped on the first field of the tuples. 
+The group-reduce function will thus receive groups of tuples with
+the same value in the first field.
+
+{% highlight python %}
+grouped = data \
+  .group_by(0,1) \
+  .reduce(/*do something*/)
+{% endhighlight %}
+
+The data set is grouped on the composite key consisting of the first and the
+second fields, therefore the reduce function will receive groups
+with the same value for both fields.
+
+A note on nested Tuples: If you have a DataSet with a nested tuple
+specifying `group_by(<index of tuple>)` will cause the system to use the full 
tuple as a key.
+
+[Back to top](#top)
+
+
+Passing Functions to Flink
+--------------------------
+
+Certain operations require user-defined functions, whereas all of them accept 
lambda functions and rich functions as arguments.
+
+{% highlight python %}
+data.filter(lambda x: x > 5)
+{% endhighlight %}
+
+{% highlight python %}
+class Filter(FilterFunction):
+    def filter(self, value):
+        return value > 5
+
+data.filter(Filter())
+{% endhighlight %}
+
+Rich functions allow the use of imported functions, provide access to 
broadcast-variables, 
+can be parameterized using __init__(), and are the go-to-option for complex 
functions.
+They are also the only way to define an optional `combine` function for a 
reduce operation.
+
+Lambda functions allow the easy insertion of one-liners. Note that a lambda 
function has to return
+an iterable, if the operation can return multiple values. (All functions 
receiving a collector argument)
+
+Flink requires type information at the time when it prepares the program for 
execution 
+(when the main method of the program is called). This is done by passing an 
exemplary 
+object that has the desired type. This holds also for tuples.
+
+{% highlight python %}
+(INT, STRING)
+{% endhighlight %}
+
+Would denote a tuple containing an int and a string. Note that for Operations 
that work strictly on tuples (like cross), no braces are required.
+
+There are a few Constants defined in flink.plan.Constants that allow this in a 
more readable fashion.
+
+[Back to top](#top)
+
+Data Types
+----------
+
+Flink's Python API currently only supports primitive python types (int, float, 
bool, string) and byte arrays.
+
+#### Tuples/Lists
+
+You can use the tuples (or lists) for composite types. Python tuples are 
mapped to the Flink Tuple type, that contain 
+a fix number of fields of various types (up to 25). Every field of a tuple can 
be a primitive type - including further tuples, resulting in nested tuples.
+
+{% highlight python %}
+word_counts = env.from_elements(("hello", 1), ("world",2))
+
+counts = word_counts.map(lambda x: x[1], INT)
+{% endhighlight %}
+
+When working with operators that require a Key for grouping or matching 
records,
+Tuples let you simply specify the positions of the fields to be used as key. 
You can specify more
+than one position to use composite keys (see [Section Data 
Transformations](#transformations)).
+
+{% highlight python %}
+wordCounts \
+    .group_by(0) \
+    .reduce(MyReduceFunction())
+{% endhighlight %}
+
+[Back to top](#top)
+
+Data Sources
+------------
+
+Data sources create the initial data sets, such as from files or from 
collections.
+
+File-based:
+
+- `read_text(path)` - Reads files line wise and returns them as Strings.
+- `read_csv(path, type)` - Parses files of comma (or another char) delimited 
fields.
+  Returns a DataSet of tuples. Supports the basic java types and their Value 
counterparts as field
+  types.
+
+Collection-based:
+
+- `from_elements(*args)` - Creates a data set from a Seq. All elements
+
+**Examples**
+
+{% highlight python %}
+env  = get_environment
+
+# read text file from local files system
+localLiens = env.read_text("file:#/path/to/my/textfile")
+
+ read text file from a HDFS running at nnHost:nnPort
+hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
+
+ read a CSV file with three fields
+csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
+
+ create a set from some given elements
+values = env.from_elements("Foo", "bar", "foobar", "fubar")
+{% endhighlight %}
+
+[Back to top](#top)
+
+Data Sinks
+----------
+
+Data sinks consume DataSets and are used to store or return them:
+
+- `write_text()` - Writes elements line-wise as Strings. The Strings are
+  obtained by calling the *str()* method of each element.
+- `write_csv(...)` - Writes tuples as comma-separated value files. Row and 
field
+  delimiters are configurable. The value for each field comes from the *str()* 
method of the objects.
+- `output()` - Prints the *str()* value of each element on the
+  standard out.
+
+A DataSet can be input to multiple operations. Programs can write or print a 
data set and at the
+same time run additional transformations on them.
+
+**Examples**
+
+Standard data sink methods:
+
+{% highlight scala %}
+ write DataSet to a file on the local file system
+textData.write_text("file:///my/result/on/localFS")
+
+ write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
+textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
+
+ write DataSet to a file and overwrite the file if it exists
+textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
+
+ tuples as lines with pipe as the separator "a|b|c"
+values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", 
field_delimiter="|")
+
+ this writes tuples in the text formatting "(a, b, c)", rather than as CSV 
lines
+values.write_text("file:///path/to/the/result/file")
+{% endhighlight %}
+
+[Back to top](#top)
+
+Broadcast Variables
+-------------------
+
+Broadcast variables allow you to make a data set available to all parallel 
instances of an
+operation, in addition to the regular input of the operation. This is useful 
for auxiliary data
+sets, or data-dependent parameterization. The data set will then be accessible 
at the operator as a
+Collection.
+
+- **Broadcast**: broadcast sets are registered by name via 
`with_broadcast_set(DataSet, String)`
+- **Access**: accessible via `self.context.get_broadcast_variable(String)` at 
the target operator
+
+{% highlight python %}
+class MapperBcv(MapFunction):
+    def map(self, value):
+        factor = self.context.get_broadcast_variable("bcv")[0][0]
+        return value * factor
+
+# 1. The DataSet to be broadcasted
+toBroadcast = env.from_elements(1, 2, 3) 
+data = env.from_elements("a", "b")
+
+# 2. Broadcast the DataSet
+data.map(MapperBcv(), INT).with_broadcast_set("bcv", toBroadcast) 
+{% endhighlight %}
+
+Make sure that the names (`bcv` in the previous example) match when 
registering and
+accessing broadcasted data sets.
+
+**Note**: As the content of broadcast variables is kept in-memory on each 
node, it should not become
+too large. For simpler things like scalar values you can simply parameterize 
the rich function.
+
+[Back to top](#top)
+
+Parallel Execution
+------------------
+
+This section describes how the parallel execution of programs can be 
configured in Flink. A Flink
+program consists of multiple tasks (operators, data sources, and sinks). A 
task is split into
+several parallel instances for execution and each parallel instance processes 
a subset of the task's
+input data. The number of parallel instances of a task is called its 
*parallelism* or *degree of
+parallelism (DOP)*.
+
+The degree of parallelism of a task can be specified in Flink on different 
levels.
+
+### Execution Environment Level
+
+Flink programs are executed in the context of an [execution 
environmentt](#program-skeleton). An
+execution environment defines a default parallelism for all operators, data 
sources, and data sinks
+it executes. Execution environment parallelism can be overwritten by 
explicitly configuring the
+parallelism of an operator.
+
+The default parallelism of an execution environment can be specified by 
calling the
+`set_degree_of_parallelism()` method. To execute all operators, data sources, 
and data sinks of the
+[WordCount](#example-program) example program with a parallelism of `3`, set 
the default parallelism of the
+execution environment as follows:
+
+{% highlight python %}
+env = get_environment()
+env.set_degree_of_parallelism(3)
+
+text.flat_map(lambda x,c: x.lower().split(), (INT, STRING)) \
+    .group_by(1) \
+    .reduce_group(Adder(), (INT, STRING), combinable=True) \
+    .output()
+
+env.execute()
+{% endhighlight %}
+
+### System Level
+
+A system-wide default parallelism for all execution environments can be 
defined by setting the
+`parallelization.degree.default` property in `./conf/flink-conf.yaml`. See the
+[Configuration](config.html) documentation for details.
+
+[Back to top](#top)
+
+Executing Plans
+---------------
+
+To run the plan with Flink, go to your Flink distribution, and run the 
pyflink.sh script from the /bin folder. 
+use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script 
containing the plan has to be passed 
+as the first argument, followed by a number of additional python packages, and 
finally, separated by - additional 
+arguments that will be fed to the script. 
+
+{% highlight python %}
+./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - 
<param1>[ <paramX>]]
+{% endhighlight %}
+
+[Back to top](#top)
+
+Debugging
+---------------
+
+If you are running Flink programs locally, you can debug your program 
following this guide.
+First you have to enable debugging by setting the debug switch in the 
`env.execute(debug=True)` call. After
+submitting your program, open the jobmanager log file, and look for a line 
that says 
+`Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py 
<port>` Now open `/tmp/flink` in your python
+IDE and run the `executor.py <port>`.
+
+[Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index 8f20648..de9ed8a 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -112,6 +112,12 @@ under the License.
                        <artifactId>flink-language-binding-generic</artifactId>
                        <version>${project.version}</version>
                </dependency>
+                
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-python</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
        </dependencies>
 
        <!-- See main pom.xml for explanation of profiles -->

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml 
b/flink-dist/src/main/assemblies/bin.xml
index 548e369..6a429ee 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -129,7 +129,12 @@ under the License.
                                
<exclude>flink-java-examples-${project.version}-tests.jar</exclude>
                        </excludes>
                </fileSet>
-
+               <fileSet>
+                       <!-- copy python package -->
+                       
<directory>../flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python</directory>
+                       <outputDirectory>resources/python/</outputDirectory>
+                       <fileMode>0755</fileMode>
+               </fileSet>
        </fileSets>
 
 </assembly>

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-dist/src/main/flink-bin/bin/flink
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/flink 
b/flink-dist/src/main/flink-bin/bin/flink
index 094ddf4..d28c04f 100644
--- a/flink-dist/src/main/flink-bin/bin/flink
+++ b/flink-dist/src/main/flink-bin/bin/flink
@@ -32,6 +32,7 @@ CC_CLASSPATH=`constructFlinkClassPath`
 log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-flink-client-$HOSTNAME.log
 log_setting="-Dlog.file="$log" 
-Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties 
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"
 
+export FLINK_ROOT_DIR
 export FLINK_CONF_DIR
 
 # Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-dist/src/main/flink-bin/bin/pyflink2.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.sh 
b/flink-dist/src/main/flink-bin/bin/pyflink2.sh
new file mode 100644
index 0000000..33c175d
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink2.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+"$FLINK_BIN_DIR"/flink run -v 
"$FLINK_ROOT_DIR"/lib/flink-python-0.9-SNAPSHOT.jar "2" "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-dist/src/main/flink-bin/bin/pyflink3.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.sh 
b/flink-dist/src/main/flink-bin/bin/pyflink3.sh
new file mode 100644
index 0000000..4ebe732
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink3.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+"$FLINK_BIN_DIR"/flink run -v 
"$FLINK_ROOT_DIR"/lib/flink-python-0.9-SNAPSHOT.jar "3" "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/pom.xml 
b/flink-staging/flink-language-binding/flink-python/pom.xml
new file mode 100644
index 0000000..1a06dfb
--- /dev/null
+++ b/flink-staging/flink-language-binding/flink-python/pom.xml
@@ -0,0 +1,86 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>flink-language-binding-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.9-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+       
+    <artifactId>flink-python</artifactId>
+    <name>flink-python</name>
+    <packaging>jar</packaging>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            
<mainClass>org.apache.flink.languagebinding.api.java.python.PythonPlanBinder</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-compiler</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-language-binding-generic</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
new file mode 100644
index 0000000..d65931a
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
@@ -0,0 +1,481 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.python;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.HashMap;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.operators.CoGroupRawOperator;
+import org.apache.flink.api.java.operators.SortedGrouping;
+import org.apache.flink.api.java.operators.UnsortedGrouping;
+import org.apache.flink.api.java.tuple.Tuple;
+import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.languagebinding.api.java.common.PlanBinder;
+import org.apache.flink.languagebinding.api.java.common.OperationInfo;
+import 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo;
+//CHECKSTYLE.OFF: AvoidStarImport - enum/function import
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.PythonOperationInfo.*;
+import org.apache.flink.languagebinding.api.java.python.functions.*;
+//CHECKSTYLE.ON: AvoidStarImport
+import org.apache.flink.languagebinding.api.java.common.streaming.Receiver;
+import 
org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class allows the execution of a Flink plan written in python.
+ */
+public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
+       static final Logger LOG = 
LoggerFactory.getLogger(PythonPlanBinder.class);
+
+       public static final String ARGUMENT_PYTHON_2 = "2";
+       public static final String ARGUMENT_PYTHON_3 = "3";
+
+       public static final String FLINK_PYTHON_DC_ID = "flink";
+       public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
+       public static final String FLINK_PYTHON_EXECUTOR_NAME = "/executor.py";
+
+       public static final String FLINK_PYTHON2_BINARY_KEY = 
"python.binary.python2";
+       public static final String FLINK_PYTHON3_BINARY_KEY = 
"python.binary.python3";
+       public static  String FLINK_PYTHON2_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+       public static  String FLINK_PYTHON3_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+
+       private static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + "/flink_plan";
+       protected static final String FLINK_PYTHON_REL_LOCAL_PATH = 
"/resources/python";
+       protected static final String FLINK_DIR = 
System.getenv("FLINK_ROOT_DIR");
+       protected static String FULL_PATH;
+
+       private Process process;
+
+       public static boolean usePython3 = false;
+
+       /**
+        * Entry point for the execution of a python plan.
+        *
+        * @param args planPath[ package1[ packageX[ - parameter1[ 
parameterX]]]]
+        * @throws Exception
+        */
+       public static void main(String[] args) throws Exception {
+               if (args.length < 2) {
+                       System.out.println("Usage: ./bin/pyflink<2/3>.sh 
<pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ 
<parameterX>]]");
+                       return;
+               }
+               usePython3 = args[0].equals(ARGUMENT_PYTHON_3);
+               PythonPlanBinder binder = new PythonPlanBinder();
+               binder.runPlan(Arrays.copyOfRange(args, 1, args.length));
+       }
+
+       public PythonPlanBinder() throws IOException {
+               FLINK_PYTHON2_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+               FLINK_PYTHON3_BINARY_PATH = 
GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+               FULL_PATH = FLINK_DIR != null
+                               //substring is used because the root dir path 
ends with "/bin/.."
+                               ? FLINK_DIR.substring(0, FLINK_DIR.length() - 
7) + FLINK_PYTHON_REL_LOCAL_PATH //command-line
+                               : 
FileSystem.getLocalFileSystem().getWorkingDirectory().toString() //testing
+                               + 
"/src/main/python/org/apache/flink/languagebinding/api/python";
+       }
+
+       protected void runPlan(String[] args) throws Exception {
+               env = ExecutionEnvironment.getExecutionEnvironment();
+
+               int split = 0;
+               for (int x = 0; x < args.length; x++) {
+                       if (args[x].compareTo("-") == 0) {
+                               split = x;
+                       }
+               }
+               try {
+                       prepareFiles(Arrays.copyOfRange(args, 0, split == 0 ? 1 
: split));
+                       startPython(Arrays.copyOfRange(args, split == 0 ? 
args.length : split + 1, args.length));
+                       receivePlan();
+
+                       if (env instanceof LocalEnvironment) {
+                               FLINK_HDFS_PATH = "file:/tmp/flink";
+                       }
+
+                       distributeFiles(env);
+                       env.execute();
+                       close();
+               } catch (Exception e) {
+                       close();
+                       throw e;
+               }
+       }
+
+       
//=====Setup========================================================================================================
+       /**
+        * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). 
This allows us to distribute it as one big
+        * package, and resolves PYTHONPATH issues.
+        *
+        * @param filePaths
+        * @throws IOException
+        * @throws URISyntaxException
+        */
+       private void prepareFiles(String... filePaths) throws IOException, 
URISyntaxException {
+               //Flink python package
+               String tempFilePath = FLINK_PYTHON_FILE_PATH;
+               clearPath(tempFilePath);
+               FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), 
false);
+
+               //plan file             
+               copyFile(filePaths[0], FLINK_PYTHON_PLAN_NAME);
+
+               //additional files/folders
+               for (int x = 1; x < filePaths.length; x++) {
+                       copyFile(filePaths[x], null);
+               }
+       }
+
+       private static void clearPath(String path) throws IOException, 
URISyntaxException {
+               FileSystem fs = FileSystem.get(new URI(path));
+               if (fs.exists(new Path(path))) {
+                       fs.delete(new Path(path), true);
+               }
+       }
+
+       private static void copyFile(String path, String name) throws 
IOException, URISyntaxException {
+               if (path.endsWith("/")) {
+                       path = path.substring(0, path.length() - 1);
+               }
+               String identifier = name == null ? 
path.substring(path.lastIndexOf("/")) : name;
+               String tmpFilePath = FLINK_PYTHON_FILE_PATH + "/" + identifier;
+               clearPath(tmpFilePath);
+               Path p = new Path(path);
+               FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new 
Path(tmpFilePath), true);
+       }
+
+       private static void distributeFiles(ExecutionEnvironment env) throws 
IOException, URISyntaxException {
+               clearPath(FLINK_HDFS_PATH);
+               FileCache.copy(new Path(FLINK_PYTHON_FILE_PATH), new 
Path(FLINK_HDFS_PATH), true);
+               env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
+               clearPath(FLINK_PYTHON_FILE_PATH);
+       }
+
+       private void startPython(String[] args) throws IOException {
+               sets = new HashMap();
+               StringBuilder argsBuilder = new StringBuilder();
+               for (String arg : args) {
+                       argsBuilder.append(" ").append(arg);
+               }
+               receiver = new Receiver(null);
+               receiver.open(null);
+
+               if (usePython3) {
+                       try {
+                               
Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH);
+                       } catch (IOException ex) {
+                               throw new 
RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " 
does not point to a valid python binary.");
+                       }
+                       process = 
Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH + " -B "
+                                       + FLINK_PYTHON_FILE_PATH + 
FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
+               } else {
+                       try {
+                               
Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
+                       } catch (IOException ex) {
+                               throw new 
RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " 
does not point to a valid python binary.");
+                       }
+                       process = 
Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH + " -B "
+                                       + FLINK_PYTHON_FILE_PATH + 
FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
+               }
+               new StreamPrinter(process.getInputStream()).start();
+               new StreamPrinter(process.getErrorStream()).start();
+
+               try {
+                       Thread.sleep(2000);
+               } catch (InterruptedException ex) {
+               }
+
+               try {
+                       int value = process.exitValue();
+                       if (value != 0) {
+                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
+                       }
+               } catch (IllegalThreadStateException ise) {
+               }
+       }
+
+       private void close() {
+               try { //prevent throwing exception so that previous exceptions 
aren't hidden.
+                       if (!DEBUG) {
+                               FileSystem hdfs = FileSystem.get(new 
URI(FLINK_HDFS_PATH));
+                               hdfs.delete(new Path(FLINK_HDFS_PATH), true);
+                       }
+
+                       FileSystem local = FileSystem.getLocalFileSystem();
+                       local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
+                       local.delete(new Path(FLINK_TMP_DATA_DIR), true);
+                       receiver.close();
+               } catch (NullPointerException npe) {
+               } catch (IOException ioe) {
+                       LOG.error("PythonAPI file cleanup failed. " + 
ioe.getMessage());
+               } catch (URISyntaxException use) { // can't occur
+               }
+               try {
+                       process.exitValue();
+               } catch (NullPointerException npe) { //exception occurred 
before process was started
+               } catch (IllegalThreadStateException ise) { //process still 
active
+                       process.destroy();
+               }
+       }
+
+       //=====Plan 
Binding=================================================================================================
+       protected class PythonOperationInfo extends OperationInfo {
+               protected byte[] operator;
+               protected String meta;
+               protected boolean combine;
+               protected byte[] combineOperator;
+               protected String name;
+
+               @Override
+               public String toString() {
+                       StringBuilder sb = new StringBuilder();
+                       sb.append("SetID: ").append(setID).append("\n");
+                       sb.append("ParentID: ").append(parentID).append("\n");
+                       sb.append("OtherID: ").append(otherID).append("\n");
+                       sb.append("Name: ").append(name).append("\n");
+                       sb.append("Operator: ").append(operator == null ? null 
: "<operator>").append("\n");
+                       sb.append("Meta: ").append(meta).append("\n");
+                       sb.append("Types: ").append(types).append("\n");
+                       sb.append("Combine: ").append(combine).append("\n");
+                       sb.append("CombineOP: ").append(combineOperator == null 
? null : "<combineop>").append("\n");
+                       sb.append("Keys1: 
").append(Arrays.toString(keys1)).append("\n");
+                       sb.append("Keys2: 
").append(Arrays.toString(keys2)).append("\n");
+                       sb.append("Projections: 
").append(Arrays.toString(projections)).append("\n");
+                       return sb.toString();
+               }
+
+               protected PythonOperationInfo(AbstractOperation identifier) 
throws IOException {
+                       Object tmpType;
+                       setID = (Integer) receiver.getRecord(true);
+                       parentID = (Integer) receiver.getRecord(true);
+                       switch (identifier) {
+                               case COGROUP:
+                                       otherID = (Integer) 
receiver.getRecord(true);
+                                       keys1 = tupleToIntArray((Tuple) 
receiver.getRecord(true));
+                                       keys2 = tupleToIntArray((Tuple) 
receiver.getRecord(true));
+                                       operator = (byte[]) 
receiver.getRecord();
+                                       meta = (String) receiver.getRecord();
+                                       tmpType = receiver.getRecord();
+                                       types = tmpType == null ? null : 
getForObject(tmpType);
+                                       name = (String) receiver.getRecord();
+                                       break;
+                               case CROSS:
+                               case CROSS_H:
+                               case CROSS_T:
+                                       otherID = (Integer) 
receiver.getRecord(true);
+                                       operator = (byte[]) 
receiver.getRecord();
+                                       meta = (String) receiver.getRecord();
+                                       tmpType = receiver.getRecord();
+                                       types = tmpType == null ? null : 
getForObject(tmpType);
+                                       int cProjectCount = (Integer) 
receiver.getRecord(true);
+                                       projections = new 
ProjectionEntry[cProjectCount];
+                                       for (int x = 0; x < cProjectCount; x++) 
{
+                                               String side = (String) 
receiver.getRecord();
+                                               int[] keys = 
tupleToIntArray((Tuple) receiver.getRecord(true));
+                                               projections[x] = new 
ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
+                                       }
+                                       name = (String) receiver.getRecord();
+                                       break;
+                               case REDUCE:
+                               case GROUPREDUCE:
+                                       operator = (byte[]) 
receiver.getRecord();
+                                       combineOperator = (byte[]) 
receiver.getRecord();
+                                       meta = (String) receiver.getRecord();
+                                       tmpType = receiver.getRecord();
+                                       types = tmpType == null ? null : 
getForObject(tmpType);
+                                       combine = (Boolean) 
receiver.getRecord();
+                                       name = (String) receiver.getRecord();
+                                       break;
+                               case JOIN:
+                               case JOIN_H:
+                               case JOIN_T:
+                                       keys1 = tupleToIntArray((Tuple) 
receiver.getRecord(true));
+                                       keys2 = tupleToIntArray((Tuple) 
receiver.getRecord(true));
+                                       otherID = (Integer) 
receiver.getRecord(true);
+                                       operator = (byte[]) 
receiver.getRecord();
+                                       meta = (String) receiver.getRecord();
+                                       tmpType = receiver.getRecord();
+                                       types = tmpType == null ? null : 
getForObject(tmpType);
+                                       int jProjectCount = (Integer) 
receiver.getRecord(true);
+                                       projections = new 
ProjectionEntry[jProjectCount];
+                                       for (int x = 0; x < jProjectCount; x++) 
{
+                                               String side = (String) 
receiver.getRecord();
+                                               int[] keys = 
tupleToIntArray((Tuple) receiver.getRecord(true));
+                                               projections[x] = new 
ProjectionEntry(ProjectionSide.valueOf(side.toUpperCase()), keys);
+                                       }
+                                       name = (String) receiver.getRecord();
+                                       break;
+                               case MAPPARTITION:
+                               case FLATMAP:
+                               case MAP:
+                               case FILTER:
+                                       operator = (byte[]) 
receiver.getRecord();
+                                       meta = (String) receiver.getRecord();
+                                       tmpType = receiver.getRecord();
+                                       types = tmpType == null ? null : 
getForObject(tmpType);
+                                       name = (String) receiver.getRecord();
+                                       break;
+                               default:
+                                       throw new 
UnsupportedOperationException("This operation is not implemented in the Python 
API: " + identifier);
+                       }
+               }
+       }
+
+       @Override
+       protected PythonOperationInfo createOperationInfo(AbstractOperation 
identifier) throws IOException {
+               return new PythonOperationInfo(identifier);
+       }
+
+       @Override
+       protected DataSet applyCoGroupOperation(DataSet op1, DataSet op2, int[] 
firstKeys, int[] secondKeys, PythonOperationInfo info) {
+               return new CoGroupRawOperator(
+                               op1,
+                               op2,
+                               new Keys.ExpressionKeys(firstKeys, 
op1.getType()),
+                               new Keys.ExpressionKeys(secondKeys, 
op2.getType()),
+                               new PythonCoGroup(info.setID, info.operator, 
info.types, info.meta),
+                               info.types, info.name);
+       }
+
+       @Override
+       protected DataSet applyCrossOperation(DataSet op1, DataSet op2, 
DatasizeHint mode, PythonOperationInfo info) {
+               switch (mode) {
+                       case NONE:
+                               return op1.cross(op2).name("PythonCrossPreStep")
+                                               .mapPartition(new 
PythonMapPartition(info.setID, info.operator, info.types, 
info.meta)).name(info.name);
+                       case HUGE:
+                               return 
op1.crossWithHuge(op2).name("PythonCrossPreStep")
+                                               .mapPartition(new 
PythonMapPartition(info.setID, info.operator, info.types, 
info.meta)).name(info.name);
+                       case TINY:
+                               return 
op1.crossWithTiny(op2).name("PythonCrossPreStep")
+                                               .mapPartition(new 
PythonMapPartition(info.setID, info.operator, info.types, 
info.meta)).name(info.name);
+                       default:
+                               throw new IllegalArgumentException("Invalid 
Cross mode specified: " + mode);
+               }
+       }
+
+       @Override
+       protected DataSet applyFilterOperation(DataSet op1, PythonOperationInfo 
info) {
+               return op1.mapPartition(new PythonMapPartition(info.setID, 
info.operator, info.types, info.meta)).name(info.name);
+       }
+
+       @Override
+       protected DataSet applyFlatMapOperation(DataSet op1, 
PythonOperationInfo info) {
+               return op1.mapPartition(new PythonMapPartition(info.setID, 
info.operator, info.types, info.meta)).name(info.name);
+       }
+
+       @Override
+       protected DataSet applyGroupReduceOperation(DataSet op1, 
PythonOperationInfo info) {
+               if (info.combine) {
+                       return op1.reduceGroup(new 
PythonCombineIdentity(info.setID, info.combineOperator, info.meta))
+                                       
.setCombinable(true).name("PythonCombine")
+                                       .mapPartition(new 
PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+                                       .name(info.name);
+               } else {
+                       return op1.reduceGroup(new PythonCombineIdentity())
+                                       
.setCombinable(false).name("PythonGroupReducePreStep")
+                                       .mapPartition(new 
PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+                                       .name(info.name);
+               }
+       }
+
+       @Override
+       protected DataSet applyGroupReduceOperation(UnsortedGrouping op1, 
PythonOperationInfo info) {
+               if (info.combine) {
+                       return op1.reduceGroup(new 
PythonCombineIdentity(info.setID, info.combineOperator, info.meta))
+                                       
.setCombinable(true).name("PythonCombine")
+                                       .mapPartition(new 
PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+                                       .name(info.name);
+               } else {
+                       return op1.reduceGroup(new PythonCombineIdentity())
+                                       
.setCombinable(false).name("PythonGroupReducePreStep")
+                                       .mapPartition(new 
PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+                                       .name(info.name);
+               }
+       }
+
+       @Override
+       protected DataSet applyGroupReduceOperation(SortedGrouping op1, 
PythonOperationInfo info) {
+               if (info.combine) {
+                       return op1.reduceGroup(new 
PythonCombineIdentity(info.setID, info.combineOperator, info.meta))
+                                       
.setCombinable(true).name("PythonCombine")
+                                       .mapPartition(new 
PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+                                       .name(info.name);
+               } else {
+                       return op1.reduceGroup(new PythonCombineIdentity())
+                                       
.setCombinable(false).name("PythonGroupReducePreStep")
+                                       .mapPartition(new 
PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+                                       .name(info.name);
+               }
+       }
+
+       @Override
+       protected DataSet applyJoinOperation(DataSet op1, DataSet op2, int[] 
firstKeys, int[] secondKeys, DatasizeHint mode, PythonOperationInfo info) {
+               switch (mode) {
+                       case NONE:
+                               return 
op1.join(op2).where(firstKeys).equalTo(secondKeys).name("PythonJoinPreStep")
+                                               .mapPartition(new 
PythonMapPartition(info.setID, info.operator, info.types, 
info.meta)).name(info.name);
+                       case HUGE:
+                               return 
op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).name("PythonJoinPreStep")
+                                               .mapPartition(new 
PythonMapPartition(info.setID, info.operator, info.types, 
info.meta)).name(info.name);
+                       case TINY:
+                               return 
op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).name("PythonJoinPreStep")
+                                               .mapPartition(new 
PythonMapPartition(info.setID, info.operator, info.types, 
info.meta)).name(info.name);
+                       default:
+                               throw new IllegalArgumentException("Invalid 
join mode specified.");
+               }
+       }
+
+       @Override
+       protected DataSet applyMapOperation(DataSet op1, PythonOperationInfo 
info) {
+               return op1.mapPartition(new PythonMapPartition(info.setID, 
info.operator, info.types, info.meta)).name(info.name);
+       }
+
+       @Override
+       protected DataSet applyMapPartitionOperation(DataSet op1, 
PythonOperationInfo info) {
+               return op1.mapPartition(new PythonMapPartition(info.setID, 
info.operator, info.types, info.meta)).name(info.name);
+       }
+
+       @Override
+       protected DataSet applyReduceOperation(DataSet op1, PythonOperationInfo 
info) {
+               return op1.reduceGroup(new PythonCombineIdentity())
+                               
.setCombinable(false).name("PythonReducePreStep")
+                               .mapPartition(new PythonMapPartition(info.setID 
* -1, info.operator, info.types, info.meta))
+                               .name(info.name);
+       }
+
+       @Override
+       protected DataSet applyReduceOperation(UnsortedGrouping op1, 
PythonOperationInfo info) {
+               if (info.combine) {
+                       return op1.reduceGroup(new 
PythonCombineIdentity(info.setID, info.combineOperator, info.meta))
+                                       
.setCombinable(true).name("PythonCombine")
+                                       .mapPartition(new 
PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+                                       .name(info.name);
+               } else {
+                       return op1.reduceGroup(new PythonCombineIdentity())
+                                       
.setCombinable(false).name("PythonReducePreStep")
+                                       .mapPartition(new 
PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+                                       .name(info.name);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
new file mode 100644
index 0000000..01f18eb
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.python.functions;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer;
+import org.apache.flink.util.Collector;
+import java.io.IOException;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * CoGroupFunction that uses a python script.
+ *
+ * @param <IN1>
+ * @param <IN2>
+ * @param <OUT>
+ */
+public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, 
IN2, OUT> implements ResultTypeQueryable {
+       private final PythonStreamer streamer;
+       private transient final TypeInformation<OUT> typeInformation;
+
+       public PythonCoGroup(int id, byte[] operator, TypeInformation<OUT> 
typeInformation, String metaInformation) {
+               this.typeInformation = typeInformation;
+               streamer = new PythonStreamer(this, id, operator, 
metaInformation);
+       }
+
+       /**
+        * Opens this function.
+        *
+        * @param config configuration
+        * @throws IOException
+        */
+       @Override
+       public void open(Configuration config) throws IOException {
+               streamer.open();
+               streamer.sendBroadCastVariables(config);
+       }
+
+       /**
+        * Calls the external python function.
+        *
+        * @param first
+        * @param second
+        * @param out collector
+        * @throws IOException
+        */
+       @Override
+       public final void coGroup(Iterable<IN1> first, Iterable<IN2> second, 
Collector<OUT> out) throws Exception {
+               streamer.streamBufferWithGroups(first.iterator(), 
second.iterator(), out);
+       }
+
+       /**
+        * Closes this function.
+        *
+        * @throws IOException
+        */
+       @Override
+       public void close() throws IOException {
+               streamer.close();
+       }
+
+       @Override
+       public TypeInformation<OUT> getProducedType() {
+               return typeInformation;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
new file mode 100644
index 0000000..3395f07
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.python.functions;
+
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer;
+import org.apache.flink.util.Collector;
+import java.io.IOException;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+
+/**
+ * Multi-purpose class, used for Combine-operations using a python script, and 
as a preprocess step for
+ * GroupReduce-operations.
+ *
+ * @param <IN>
+ */
+public class PythonCombineIdentity<IN> extends RichGroupReduceFunction<IN, IN> 
{
+       private PythonStreamer streamer;
+
+       public PythonCombineIdentity() {
+               streamer = null;
+       }
+
+       public PythonCombineIdentity(int id, byte[] operator, String 
metaInformation) {
+               streamer = new PythonStreamer(this, id, operator, 
metaInformation);
+       }
+
+       @Override
+       public void open(Configuration config) throws IOException {
+               if (streamer != null) {
+                       streamer.open();
+                       streamer.sendBroadCastVariables(config);
+               }
+       }
+
+       /**
+        * Calls the external python function.
+        *
+        * @param values function input
+        * @param out collector
+        * @throws IOException
+        */
+       @Override
+       public final void reduce(Iterable<IN> values, Collector<IN> out) throws 
Exception {
+               for (IN value : values) {
+                       out.collect(value);
+               }
+       }
+
+       /**
+        * Calls the external python function.
+        *
+        * @param values function input
+        * @param out collector
+        * @throws IOException
+        */
+       @Override
+       public final void combine(Iterable<IN> values, Collector<IN> out) 
throws Exception {
+               streamer.streamBufferWithoutGroups(values.iterator(), out);
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (streamer != null) {
+                       streamer.close();
+                       streamer = null;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
new file mode 100644
index 0000000..f582e3d
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.python.functions;
+
+import java.io.IOException;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.languagebinding.api.java.python.streaming.PythonStreamer;
+import org.apache.flink.util.Collector;
+
+/**
+ * Multi-purpose class, usable by all operations using a python script with 
one input source and possibly differing
+ * in-/output types.
+ *
+ * @param <IN>
+ * @param <OUT>
+ */
+public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, 
OUT> implements ResultTypeQueryable {
+       private final PythonStreamer streamer;
+       private transient final TypeInformation<OUT> typeInformation;
+
+       public PythonMapPartition(int id, byte[] operator, TypeInformation<OUT> 
typeInformation, String metaInformation) {
+               this.typeInformation = typeInformation;
+               streamer = new PythonStreamer(this, id, operator, 
metaInformation);
+       }
+
+       /**
+        * Opens this function.
+        *
+        * @param config configuration
+        * @throws IOException
+        */
+       @Override
+       public void open(Configuration config) throws IOException {
+               streamer.open();
+               streamer.sendBroadCastVariables(config);
+       }
+
+       @Override
+       public void mapPartition(Iterable<IN> values, Collector<OUT> out) 
throws Exception {
+               streamer.streamBufferWithoutGroups(values.iterator(), out);
+       }
+
+       /**
+        * Closes this function.
+        *
+        * @throws IOException
+        */
+       @Override
+       public void close() throws IOException {
+               streamer.close();
+       }
+
+       @Override
+       public TypeInformation<OUT> getProducedType() {
+               return typeInformation;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d182daa1/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
new file mode 100644
index 0000000..835d95b
--- /dev/null
+++ 
b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.languagebinding.api.java.python.streaming;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_DC_ID;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
+import static 
org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
+import 
org.apache.flink.languagebinding.api.java.common.streaming.StreamPrinter;
+import org.apache.flink.languagebinding.api.java.common.streaming.Streamer;
+import org.apache.flink.languagebinding.api.java.python.PythonPlanBinder;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_KEY;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_KEY;
+import static 
org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
+
+/**
+ * This streamer is used by functions to send/receive data to/from an external 
python process.
+ */
+public class PythonStreamer extends Streamer {
+       private final byte[] operator;
+       private Process process;
+       private final String metaInformation;
+       private final int id;
+       private final boolean usePython3;
+       private final boolean debug;
+
+       private String inputFilePath;
+       private String outputFilePath;
+
+       public PythonStreamer(AbstractRichFunction function, int id, byte[] 
operator, String metaInformation) {
+               super(function);
+               this.operator = operator;
+               this.metaInformation = metaInformation;
+               this.id = id;
+               this.usePython3 = PythonPlanBinder.usePython3;
+               this.debug = DEBUG;
+       }
+
+       /**
+        * Starts the python script.
+        *
+        * @throws IOException
+        */
+       @Override
+       public void setupProcess() throws IOException {
+               startPython();
+       }
+
+       private void startPython() throws IOException {
+               this.outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
+               this.inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
+
+               sender.open(inputFilePath);
+               receiver.open(outputFilePath);
+
+               ProcessBuilder pb = new ProcessBuilder();
+
+               String path = 
function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
+               String executorPath = path + FLINK_PYTHON_EXECUTOR_NAME;
+               String[] frag = metaInformation.split("\\|");
+               StringBuilder importString = new StringBuilder();
+               if (frag[0].contains("__main__")) {
+                       importString.append("from ");
+                       importString.append(FLINK_PYTHON_PLAN_NAME.substring(1, 
FLINK_PYTHON_PLAN_NAME.length() - 3));
+                       importString.append(" import ");
+                       importString.append(frag[1]);
+               } else {
+                       importString.append("import ");
+                       importString.append(FLINK_PYTHON_PLAN_NAME.substring(1, 
FLINK_PYTHON_PLAN_NAME.length() - 3));
+               }
+
+               if (usePython3) {
+                       try {
+                               
Runtime.getRuntime().exec(FLINK_PYTHON3_BINARY_PATH);
+                       } catch (IOException ex) {
+                               throw new 
RuntimeException(FLINK_PYTHON3_BINARY_KEY + "=" + FLINK_PYTHON3_BINARY_PATH + " 
does not point to a valid python binary.");
+                       }
+                       pb.command(FLINK_PYTHON3_BINARY_PATH, "-O", "-B", 
executorPath, "" + socket.getLocalPort());
+               } else {
+                       try {
+                               
Runtime.getRuntime().exec(FLINK_PYTHON2_BINARY_PATH);
+                       } catch (IOException ex) {
+                               throw new 
RuntimeException(FLINK_PYTHON2_BINARY_KEY + "=" + FLINK_PYTHON2_BINARY_PATH + " 
does not point to a valid python binary.");
+                       }
+                       pb.command(FLINK_PYTHON2_BINARY_PATH, "-O", "-B", 
executorPath, "" + socket.getLocalPort());
+               }
+               if (debug) {
+                       socket.setSoTimeout(0);
+                       LOG.info("Waiting for Python Process : " + 
function.getRuntimeContext().getTaskName()
+                                       + " Run python /tmp/flink" + 
FLINK_PYTHON_EXECUTOR_NAME + " " + socket.getLocalPort());
+               } else {
+                       process = pb.start();
+                       new StreamPrinter(process.getInputStream()).start();
+                       new StreamPrinter(process.getErrorStream(), true, 
msg).start();
+               }
+               byte[] executorPort = new byte[4];
+               socket.receive(new DatagramPacket(executorPort, 0, 4));
+               int exPort = getInt(executorPort, 0);
+               if (exPort == -2) {
+                       try { //wait before terminating to ensure that the 
complete error message is printed
+                               Thread.sleep(2000);
+                       } catch (InterruptedException ex) {
+                       }
+                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " terminated prematurely." + 
msg);
+               }
+
+               byte[] opSize = new byte[4];
+               putInt(opSize, 0, operator.length);
+               socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
+               socket.send(new DatagramPacket(operator, 0, operator.length, 
host, exPort));
+
+               byte[] meta = importString.toString().getBytes("utf-8");
+               putInt(opSize, 0, meta.length);
+               socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
+               socket.send(new DatagramPacket(meta, 0, meta.length, host, 
exPort));
+
+               byte[] input = inputFilePath.getBytes("utf-8");
+               putInt(opSize, 0, input.length);
+               socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
+               socket.send(new DatagramPacket(input, 0, input.length, host, 
exPort));
+
+               byte[] output = outputFilePath.getBytes("utf-8");
+               putInt(opSize, 0, output.length);
+               socket.send(new DatagramPacket(opSize, 0, 4, host, exPort));
+               socket.send(new DatagramPacket(output, 0, output.length, host, 
exPort));
+
+               try { // wait a bit to catch syntax errors
+                       Thread.sleep(2000);
+               } catch (InterruptedException ex) {
+               }
+               if (!debug) {
+                       try {
+                               process.exitValue();
+                               throw new RuntimeException("External process 
for task " + function.getRuntimeContext().getTaskName() + " terminated 
prematurely." + msg);
+                       } catch (IllegalThreadStateException ise) { //process 
still active -> start receiving data
+                       }
+               }
+       }
+
+       /**
+        * Closes this streamer.
+        *
+        * @throws IOException
+        */
+       @Override
+       public void close() throws IOException {
+               try {
+                       super.close();
+               } catch (Exception e) {
+                       LOG.error("Exception occurred while closing Streamer. 
:" + e.getMessage());
+               }
+               if (!debug) {
+                       try {
+                               process.exitValue();
+                       } catch (IllegalThreadStateException ise) { //process 
still active
+                               process.destroy();
+                       }
+               }
+       }
+}

Reply via email to