[FLINK-6229] [py] Rework configuration of PythonPlanBinder/Operators

- unify python2/python3 configuration
- explicitly pass on a configuration to each operator
- port all configuration options to ConfigOptions
- [FLINK-5516] Make all paths explicitly configurable
- [FLINK-6230] Make mmap size configurable


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdcebfda
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdcebfda
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdcebfda

Branch: refs/heads/table-retraction
Commit: bdcebfda06846a1e21bb6a4678909d503ebc6333
Parents: 940d16c
Author: zentol <ches...@apache.org>
Authored: Fri Mar 31 11:55:18 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Thu Apr 6 10:57:11 2017 +0200

----------------------------------------------------------------------
 docs/dev/batch/python.md                        |  10 +-
 flink-dist/src/main/flink-bin/bin/pyflink.bat   |  25 ++++
 flink-dist/src/main/flink-bin/bin/pyflink.sh    |  25 ++++
 flink-dist/src/main/flink-bin/bin/pyflink2.bat  |  25 ----
 flink-dist/src/main/flink-bin/bin/pyflink2.sh   |  25 ----
 flink-dist/src/main/flink-bin/bin/pyflink3.bat  |  25 ----
 flink-dist/src/main/flink-bin/bin/pyflink3.sh   |  26 ----
 .../apache/flink/python/api/PythonOptions.java  |  74 ++++++++++
 .../flink/python/api/PythonPlanBinder.java      | 143 +++++++++++--------
 .../python/api/functions/PythonCoGroup.java     |   4 +-
 .../api/functions/PythonMapPartition.java       |   4 +-
 .../streaming/data/PythonDualInputSender.java   |   6 +
 .../streaming/data/PythonDualInputStreamer.java |   5 +-
 .../api/streaming/data/PythonReceiver.java      |  29 ++--
 .../python/api/streaming/data/PythonSender.java |  36 ++---
 .../streaming/data/PythonSingleInputSender.java |   6 +
 .../data/PythonSingleInputStreamer.java         |   9 +-
 .../api/streaming/data/PythonStreamer.java      |  39 ++---
 .../api/streaming/plan/PythonPlanStreamer.java  |  12 +-
 .../python/api/flink/connection/Connection.py   |  21 ++-
 .../api/flink/functions/CoGroupFunction.py      |   4 +-
 .../python/api/flink/functions/Function.py      |   4 +-
 .../api/flink/functions/GroupReduceFunction.py  |   4 +-
 .../api/flink/functions/ReduceFunction.py       |   4 +-
 .../flink/python/api/flink/plan/Environment.py  |   5 +-
 .../flink/python/api/PythonPlanBinderTest.java  |  12 +-
 26 files changed, 322 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/docs/dev/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md
index 09a4fa8..c4c2671 100644
--- a/docs/dev/batch/python.md
+++ b/docs/dev/batch/python.md
@@ -149,8 +149,7 @@ Apart from setting up Flink, no additional work is 
required. The python package
 
 The Python API was tested on Linux/Windows systems that have Python 2.7 or 3.4 
installed.
 
-By default Flink will start python processes by calling "python" or "python3", 
depending on which start-script
-was used. By setting the "python.binary.python[2/3]" key in the 
flink-conf.yaml you can modify this behaviour to use a binary of your choice.
+By default Flink will start python processes by calling "python". By setting 
the "python.binary.path" key in the flink-conf.yaml you can modify this 
behaviour to use a binary of your choice.
 
 {% top %}
 
@@ -624,12 +623,11 @@ Executing Plans
 ---------------
 
 To run the plan with Flink, go to your Flink distribution, and run the 
pyflink.sh script from the /bin folder.
-use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script 
containing the plan has to be passed
-as the first argument, followed by a number of additional python packages, and 
finally, separated by - additional
-arguments that will be fed to the script.
+The script containing the plan has to be passed as the first argument, 
followed by a number of additional python
+packages, and finally, separated by - additional arguments that will be fed to 
the script.
 
 {% highlight python %}
-./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - 
<param1>[ <paramX>]]
+./bin/pyflink.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ 
<paramX>]]
 {% endhighlight %}
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink.bat 
b/flink-dist/src/main/flink-bin/bin/pyflink.bat
new file mode 100644
index 0000000..da180f3
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink.bat
@@ -0,0 +1,25 @@
+::###############################################################################
+::  Licensed to the Apache Software Foundation (ASF) under one
+::  or more contributor license agreements.  See the NOTICE file
+::  distributed with this work for additional information
+::  regarding copyright ownership.  The ASF licenses this file
+::  to you under the Apache License, Version 2.0 (the
+::  "License"); you may not use this file except in compliance
+::  with the License.  You may obtain a copy of the License at
+::
+::      http://www.apache.org/licenses/LICENSE-2.0
+::
+::  Unless required by applicable law or agreed to in writing, software
+::  distributed under the License is distributed on an "AS IS" BASIS,
+::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+::  See the License for the specific language governing permissions and
+:: limitations under the License.
+::###############################################################################
+
+@echo off
+setlocal EnableDelayedExpansion
+
+SET bin=%~dp0
+SET FLINK_ROOT_DIR=%bin%..
+
+"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar %*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink.sh 
b/flink-dist/src/main/flink-bin/bin/pyflink.sh
new file mode 100644
index 0000000..37679d3
--- /dev/null
+++ b/flink-dist/src/main/flink-bin/bin/pyflink.sh
@@ -0,0 +1,25 @@
+#!/bin/bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink2.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.bat 
b/flink-dist/src/main/flink-bin/bin/pyflink2.bat
deleted file mode 100644
index 1122ed4..0000000
--- a/flink-dist/src/main/flink-bin/bin/pyflink2.bat
+++ /dev/null
@@ -1,25 +0,0 @@
-::###############################################################################
-::  Licensed to the Apache Software Foundation (ASF) under one
-::  or more contributor license agreements.  See the NOTICE file
-::  distributed with this work for additional information
-::  regarding copyright ownership.  The ASF licenses this file
-::  to you under the Apache License, Version 2.0 (the
-::  "License"); you may not use this file except in compliance
-::  with the License.  You may obtain a copy of the License at
-::
-::      http://www.apache.org/licenses/LICENSE-2.0
-::
-::  Unless required by applicable law or agreed to in writing, software
-::  distributed under the License is distributed on an "AS IS" BASIS,
-::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-::  See the License for the specific language governing permissions and
-:: limitations under the License.
-::###############################################################################
-
-@echo off
-setlocal EnableDelayedExpansion
-
-SET bin=%~dp0
-SET FLINK_ROOT_DIR=%bin%..
-
-"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar 2 
%*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink2.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.sh 
b/flink-dist/src/main/flink-bin/bin/pyflink2.sh
deleted file mode 100644
index 8a326e9..0000000
--- a/flink-dist/src/main/flink-bin/bin/pyflink2.sh
+++ /dev/null
@@ -1,25 +0,0 @@
-#!/bin/bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/config.sh
-
-"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "2" "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink3.bat
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.bat 
b/flink-dist/src/main/flink-bin/bin/pyflink3.bat
deleted file mode 100644
index 0294e37..0000000
--- a/flink-dist/src/main/flink-bin/bin/pyflink3.bat
+++ /dev/null
@@ -1,25 +0,0 @@
-::###############################################################################
-::  Licensed to the Apache Software Foundation (ASF) under one
-::  or more contributor license agreements.  See the NOTICE file
-::  distributed with this work for additional information
-::  regarding copyright ownership.  The ASF licenses this file
-::  to you under the Apache License, Version 2.0 (the
-::  "License"); you may not use this file except in compliance
-::  with the License.  You may obtain a copy of the License at
-::
-::      http://www.apache.org/licenses/LICENSE-2.0
-::
-::  Unless required by applicable law or agreed to in writing, software
-::  distributed under the License is distributed on an "AS IS" BASIS,
-::  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-::  See the License for the specific language governing permissions and
-:: limitations under the License.
-::###############################################################################
-
-@echo off
-setlocal EnableDelayedExpansion
-
-SET bin=%~dp0
-SET FLINK_ROOT_DIR=%bin%..
-
-"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar 3 
%*
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink3.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.sh 
b/flink-dist/src/main/flink-bin/bin/pyflink3.sh
deleted file mode 100644
index d2d1991..0000000
--- a/flink-dist/src/main/flink-bin/bin/pyflink3.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-bin=`dirname "$0"`
-bin=`cd "$bin"; pwd`
-
-. "$bin"/config.sh
-
-
-"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "3" "$@"

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
new file mode 100644
index 0000000..de053a0
--- /dev/null
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.python.api;
+
+import org.apache.flink.configuration.ConfigOption;
+
+import java.io.File;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/**
+ * Configuration options for the Python API.
+ */
+public class PythonOptions {
+
+       /**
+        * The config parameter defining the path to the python binary to use.
+        */
+       public static final ConfigOption<String> PYTHON_BINARY_PATH =
+               key("python.binary.path")
+                       .defaultValue("python")
+               .withDeprecatedKeys("python.binary.python2", 
"python.binary.python3");
+
+       /**
+        * The config parameter defining the size of the memory-mapped files, 
in kb.
+        * This value must be large enough to ensure that the largest 
serialized record can be written completely into
+        * the file.
+        * 
+        * Every task will allocate 2 memory-files, each with this size.
+        */
+       public static final ConfigOption<Long> MMAP_FILE_SIZE =
+               key("python.mmap.size.kb")
+                       .defaultValue(4L);
+
+       /**
+        * The config parameter defining where temporary plan-related files are 
stored on the client.
+        */
+       public static final ConfigOption<String> PLAN_TMP_DIR =
+               key("python.plan.tmp.dir")
+                       .noDefaultValue();
+
+       /**
+        * The config parameter defining where the memory-mapped files will be 
created.
+        */
+       public static final ConfigOption<String> DATA_TMP_DIR =
+               key("python.mmap.tmp.dir")
+                       .noDefaultValue();
+
+       /**
+        * The config parameter defining where the flink python library and 
user supplied files will be uploaded to before
+        * registering them with the Distributed Cache. This directory must be 
accessible from all worker nodes.
+        */
+       public static final ConfigOption<String> DC_TMP_DIR =
+               key("python.dc.tmp.dir")
+                       .defaultValue(System.getProperty("java.io.tmpdir") + 
File.separator + "flink_dc");
+
+       private PythonOptions() {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 733a6fb..b6181b4 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -18,7 +18,6 @@ import 
org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.LocalEnvironment;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.io.TupleCsvInputFormat;
 import org.apache.flink.api.java.operators.CoGroupRawOperator;
@@ -50,7 +49,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.Random;
@@ -65,42 +63,30 @@ import static 
org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY;
 public class PythonPlanBinder {
        static final Logger LOG = 
LoggerFactory.getLogger(PythonPlanBinder.class);
 
-       public static final String ARGUMENT_PYTHON_2 = "2";
-       public static final String ARGUMENT_PYTHON_3 = "3";
-
        public static final String FLINK_PYTHON_DC_ID = "flink";
        public static final String FLINK_PYTHON_PLAN_NAME = File.separator + 
"plan.py";
 
-       public static final String FLINK_PYTHON2_BINARY_KEY = 
"python.binary.python2";
-       public static final String FLINK_PYTHON3_BINARY_KEY = 
"python.binary.python3";
        public static final String PLANBINDER_CONFIG_BCVAR_COUNT = 
"PLANBINDER_BCVAR_COUNT";
        public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = 
"PLANBINDER_BCVAR_";
-       public static String FLINK_PYTHON2_BINARY_PATH =
-               
GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON2_BINARY_KEY, 
"python");
-       public static String FLINK_PYTHON3_BINARY_PATH =
-               
GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON3_BINARY_KEY, 
"python3");
 
        private static final Random r = new Random();
 
-       public static final String FLINK_PYTHON_FILE_PATH = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
+       public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments";
+
        private static final String FLINK_PYTHON_REL_LOCAL_PATH = 
File.separator + "resources" + File.separator + "python";
-       private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
-       private static String FULL_PATH;
 
-       public static StringBuilder arguments = new StringBuilder();
+       private final Configuration operatorConfig;
 
-       public static boolean usePython3 = false;
+       private final String pythonLibraryPath;
 
-       private static String FLINK_HDFS_PATH = "hdfs:/tmp";
-       public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
+       private final String tmpPlanFilesDir;
+       private String tmpDistributedDir;
 
        private final SetCache sets = new SetCache();
        public ExecutionEnvironment env;
        private int currentEnvironmentID = 0;
        private PythonPlanStreamer streamer;
 
-       public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64;
-
        /**
         * Entry point for the execution of a python plan.
         *
@@ -112,23 +98,37 @@ public class PythonPlanBinder {
                        System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] 
<pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ 
<parameterX>]]");
                        return;
                }
-               usePython3 = args[0].equals(ARGUMENT_PYTHON_3);
-               PythonPlanBinder binder = new PythonPlanBinder();
-               binder.runPlan(Arrays.copyOfRange(args, 1, args.length));
-       }
 
-       public PythonPlanBinder() {
-               Configuration conf = GlobalConfiguration.loadConfiguration();
-               FLINK_PYTHON2_BINARY_PATH = 
conf.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-               FLINK_PYTHON3_BINARY_PATH = 
conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
-               FULL_PATH = FLINK_DIR != null
+               Configuration globalConfig = 
GlobalConfiguration.loadConfiguration();
+               PythonPlanBinder binder = new PythonPlanBinder(globalConfig);
+               binder.runPlan(args);
+       }
+
+       public PythonPlanBinder(Configuration globalConfig) {
+               String configuredPlanTmpPath = 
globalConfig.getString(PythonOptions.PLAN_TMP_DIR);
+               tmpPlanFilesDir = configuredPlanTmpPath != null
+                       ? configuredPlanTmpPath
+                       : System.getProperty("java.io.tmpdir") + File.separator 
+ "flink_plan_" + r.nextInt();
+               
+               tmpDistributedDir = 
globalConfig.getString(PythonOptions.DC_TMP_DIR);
+               
+               String flinkRootDir = System.getenv("FLINK_ROOT_DIR");
+               pythonLibraryPath = flinkRootDir != null
                                //command-line
-                               ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH
+                               ? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH
                                //testing
-                               : new 
Path(FileSystem.getLocalFileSystem().getWorkingDirectory(), 
"src/main/python/org/apache/flink/python/api").toString();
+                               : new File(System.getProperty("user.dir"), 
"src/main/python/org/apache/flink/python/api").getAbsolutePath();
+
+               operatorConfig = new Configuration();
+               operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, 
globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH));
+               String configuredTmpDataDir = 
globalConfig.getString(PythonOptions.DATA_TMP_DIR);
+               if (configuredTmpDataDir != null) {
+                       operatorConfig.setString(PythonOptions.DATA_TMP_DIR, 
configuredTmpDataDir);
+               }
+               operatorConfig.setLong(PythonOptions.MMAP_FILE_SIZE, 
globalConfig.getLong(PythonOptions.MMAP_FILE_SIZE));
        }
 
-       private void runPlan(String[] args) throws Exception {
+       void runPlan(String[] args) throws Exception {
                int split = 0;
                for (int x = 0; x < args.length; x++) {
                        if (args[x].compareTo("-") == 0) {
@@ -137,7 +137,7 @@ public class PythonPlanBinder {
                }
 
                try {
-                       String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt();
+                       String tmpPath = tmpPlanFilesDir;
                        prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split 
== 0 ? args.length : split));
                        startPython(tmpPath, Arrays.copyOfRange(args, split == 
0 ? args.length : split + 1, args.length));
 
@@ -145,10 +145,6 @@ public class PythonPlanBinder {
                        while (streamer.preparePlanMode()) {
                                receivePlan();
 
-                               if (env instanceof LocalEnvironment) {
-                                       FLINK_HDFS_PATH = "file:" + 
System.getProperty("java.io.tmpdir") + File.separator + "flink";
-                               }
-
                                distributeFiles(tmpPath, env);
                                JobExecutionResult jer = env.execute();
                                sendResult(jer);
@@ -167,8 +163,8 @@ public class PythonPlanBinder {
        
//=====Setup========================================================================================================
 
        /**
-        * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). 
This allows us to distribute it as one big
-        * package, and resolves PYTHONPATH issues.
+        * Copies all files to a common directory {@link 
PythonOptions#PLAN_TMP_DIR}). This allows us to distribute it as
+        * one big package which resolves PYTHONPATH issues.
         *
         * @param filePaths
         * @throws IOException
@@ -177,7 +173,7 @@ public class PythonPlanBinder {
        private void prepareFiles(String tempFilePath, String... filePaths) 
throws IOException, URISyntaxException {
                //Flink python package
                clearPath(tempFilePath);
-               FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), 
false);
+               FileCache.copy(new Path(pythonLibraryPath), new 
Path(tmpPlanFilesDir), false);
 
                //plan file             
                copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME);
@@ -206,17 +202,21 @@ public class PythonPlanBinder {
                FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new 
Path(tmpFilePath), true);
        }
 
-       private static void distributeFiles(String tmpPath, 
ExecutionEnvironment env) throws IOException, URISyntaxException {
-               clearPath(FLINK_HDFS_PATH);
-               FileCache.copy(new Path(tmpPath), new Path(FLINK_HDFS_PATH), 
true);
-               env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID);
+       private void distributeFiles(String tmpPath, ExecutionEnvironment env) 
throws IOException {
+               clearPath(tmpDistributedDir);
+               FileCache.copy(new Path(tmpPath), new Path(tmpDistributedDir), 
true);
+               env.registerCachedFile(new 
Path(tmpDistributedDir).toUri().toString(), FLINK_PYTHON_DC_ID);
        }
 
        private void startPython(String tempPath, String[] args) throws 
IOException {
+               StringBuilder arguments = new StringBuilder();
                for (String arg : args) {
                        arguments.append(" ").append(arg);
                }
-               streamer = new PythonPlanStreamer();
+
+               operatorConfig.setString(PLAN_ARGUMENTS_KEY, 
arguments.toString());
+
+               streamer = new PythonPlanStreamer(operatorConfig);
                streamer.open(tempPath, arguments.toString());
        }
 
@@ -227,17 +227,15 @@ public class PythonPlanBinder {
 
        private void close() {
                try { //prevent throwing exception so that previous exceptions 
aren't hidden.
-                       FileSystem hdfs = FileSystem.get(new 
URI(FLINK_HDFS_PATH));
-                       hdfs.delete(new Path(FLINK_HDFS_PATH), true);
+                       FileSystem hdfs = new 
Path(tmpDistributedDir).getFileSystem();
+                       hdfs.delete(new Path(tmpDistributedDir), true);
 
                        FileSystem local = FileSystem.getLocalFileSystem();
-                       local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
-                       local.delete(new Path(FLINK_TMP_DATA_DIR), true);
+                       local.delete(new Path(tmpPlanFilesDir), true);
                        streamer.close();
                } catch (NullPointerException ignored) {
                } catch (IOException ioe) {
                        LOG.error("PythonAPI file cleanup failed. {}", 
ioe.getMessage());
-               } catch (URISyntaxException use) { // can't occur
                }
        }
 
@@ -271,7 +269,10 @@ public class PythonPlanBinder {
                                        env.setParallelism(dop);
                                        break;
                                case MODE:
-                                       FLINK_HDFS_PATH = 
value.<Boolean>getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink";
+                                       if (value.<Boolean>getField(1)) {
+                                               LOG.info("Local execution 
specified, using default for {}.", PythonOptions.DC_TMP_DIR);
+                                               tmpDistributedDir = 
PythonOptions.DC_TMP_DIR.defaultValue();
+                                       }
                                        break;
                                case RETRY:
                                        int retry = value.<Integer>getField(1);
@@ -527,7 +528,7 @@ public class PythonPlanBinder {
                DataSet<IN2> op2 = sets.getDataSet(info.otherID);
                Keys.ExpressionKeys<IN1> key1 = new 
Keys.ExpressionKeys<>(info.keys1, op1.getType());
                Keys.ExpressionKeys<IN2> key2 = new 
Keys.ExpressionKeys<>(info.keys2, op2.getType());
-               PythonCoGroup<IN1, IN2, OUT> pcg = new 
PythonCoGroup<>(info.envID, info.setID, type);
+               PythonCoGroup<IN1, IN2, OUT> pcg = new 
PythonCoGroup<>(operatorConfig, info.envID, info.setID, type);
                sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, 
key2, pcg, type, info.name).setParallelism(getParallelism(info)));
        }
 
@@ -552,7 +553,9 @@ public class PythonPlanBinder {
 
                defaultResult.setParallelism(getParallelism(info));
                if (info.usesUDF) {
-                       sets.add(info.setID, defaultResult.mapPartition(new 
PythonMapPartition<Tuple2<IN1, IN2>, OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name));
+                       sets.add(info.setID, defaultResult
+                               .mapPartition(new 
PythonMapPartition<Tuple2<IN1, IN2>, OUT>(operatorConfig, info.envID, 
info.setID, type))
+                               
.setParallelism(getParallelism(info)).name(info.name));
                } else {
                        sets.add(info.setID, 
defaultResult.name("DefaultCross"));
                }
@@ -560,12 +563,16 @@ public class PythonPlanBinder {
 
        private <IN, OUT> void createFilterOperation(PythonOperationInfo info, 
TypeInformation<OUT> type) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
-               sets.add(info.setID, op1.mapPartition(new 
PythonMapPartition<IN, OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name));
+               sets.add(info.setID, op1
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name));
        }
 
        private <IN, OUT> void createFlatMapOperation(PythonOperationInfo info, 
TypeInformation<OUT> type) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
-               sets.add(info.setID, op1.mapPartition(new 
PythonMapPartition<IN, OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name));
+               sets.add(info.setID, op1
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name));
        }
 
        private void createGroupReduceOperation(PythonOperationInfo info) {
@@ -581,19 +588,22 @@ public class PythonPlanBinder {
        private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> 
op1, PythonOperationInfo info, TypeInformation<OUT> type) {
                return op1
                        .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info))
-                       .mapPartition(new PythonMapPartition<IN, 
OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name);
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name);
        }
 
        private <IN, OUT> DataSet<OUT> 
applyGroupReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, 
TypeInformation<OUT> type) {
                return op1
                        .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
-                       .mapPartition(new PythonMapPartition<IN, 
OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name);
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name);
        }
 
        private <IN, OUT> DataSet<OUT> 
applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, 
TypeInformation<OUT> type) {
                return op1
                        .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep")
-                       .mapPartition(new PythonMapPartition<IN, 
OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name);
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name);
        }
 
        private <IN1, IN2, OUT> void createJoinOperation(DatasizeHint mode, 
PythonOperationInfo info, TypeInformation<OUT> type) {
@@ -602,7 +612,8 @@ public class PythonPlanBinder {
 
                if (info.usesUDF) {
                        sets.add(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode, getParallelism(info))
-                               .mapPartition(new 
PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name));
+                               .mapPartition(new 
PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(operatorConfig, info.envID, 
info.setID, type))
+                               
.setParallelism(getParallelism(info)).name(info.name));
                } else {
                        sets.add(info.setID, createDefaultJoin(op1, op2, 
info.keys1, info.keys2, mode, getParallelism(info)));
                }
@@ -629,12 +640,16 @@ public class PythonPlanBinder {
 
        private <IN, OUT> void createMapOperation(PythonOperationInfo info, 
TypeInformation<OUT> type) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
-               sets.add(info.setID, op1.mapPartition(new 
PythonMapPartition<IN, OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name));
+               sets.add(info.setID, op1
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name));
        }
 
        private <IN, OUT> void createMapPartitionOperation(PythonOperationInfo 
info, TypeInformation<OUT> type) {
                DataSet<IN> op1 = sets.getDataSet(info.parentID);
-               sets.add(info.setID, op1.mapPartition(new 
PythonMapPartition<IN, OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name));
+               sets.add(info.setID, op1
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name));
        }
 
        private void createReduceOperation(PythonOperationInfo info) {
@@ -650,12 +665,14 @@ public class PythonPlanBinder {
        private <IN, OUT> DataSet<OUT> applyReduceOperation(DataSet<IN> op1, 
PythonOperationInfo info, TypeInformation<OUT> type) {
                return op1
                        .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
-                       .mapPartition(new PythonMapPartition<IN, 
OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name);
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name);
        }
 
        private <IN, OUT> DataSet<OUT> 
applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, 
TypeInformation<OUT> type) {
                return op1
                        .reduceGroup(new 
IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep")
-                       .mapPartition(new PythonMapPartition<IN, 
OUT>(info.envID, info.setID, 
type)).setParallelism(getParallelism(info)).name(info.name);
+                       .mapPartition(new PythonMapPartition<IN, 
OUT>(operatorConfig, info.envID, info.setID, type))
+                       .setParallelism(getParallelism(info)).name(info.name);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index ff5a8d4..a5e3e75 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -35,9 +35,9 @@ public class PythonCoGroup<IN1, IN2, OUT> extends 
RichCoGroupFunction<IN1, IN2,
        private final PythonDualInputStreamer<IN1, IN2, OUT> streamer;
        private final transient TypeInformation<OUT> typeInformation;
 
-       public PythonCoGroup(int envID, int setID, TypeInformation<OUT> 
typeInformation) {
+       public PythonCoGroup(Configuration config, int envID, int setID, 
TypeInformation<OUT> typeInformation) {
                this.typeInformation = typeInformation;
-               streamer = new PythonDualInputStreamer<>(this, envID, setID, 
typeInformation instanceof PrimitiveArrayTypeInfo);
+               streamer = new PythonDualInputStreamer<>(this, config, envID, 
setID, typeInformation instanceof PrimitiveArrayTypeInfo);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index 9142581..207ead9 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -35,9 +35,9 @@ public class PythonMapPartition<IN, OUT> extends 
RichMapPartitionFunction<IN, OU
        private final PythonSingleInputStreamer<IN, OUT> streamer;
        private final transient TypeInformation<OUT> typeInformation;
 
-       public PythonMapPartition(int envId, int setId, TypeInformation<OUT> 
typeInformation) {
+       public PythonMapPartition(Configuration config, int envId, int setId, 
TypeInformation<OUT> typeInformation) {
                this.typeInformation = typeInformation;
-               streamer = new PythonSingleInputStreamer<>(this, envId, setId, 
typeInformation instanceof PrimitiveArrayTypeInfo);
+               streamer = new PythonSingleInputStreamer<>(this, config, envId, 
setId, typeInformation instanceof PrimitiveArrayTypeInfo);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
index 3b8e423..a16f522 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.python.api.streaming.data;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.io.IOException;
 
 /**
@@ -32,6 +34,10 @@ public class PythonDualInputSender<IN1, IN2> extends 
PythonSender {
        private transient Serializer<IN1> serializer1;
        private transient Serializer<IN2> serializer2;
 
+       protected PythonDualInputSender(Configuration config) {
+               super(config);
+       }
+
        /**
         * Extracts records from an iterator and writes them to the 
memory-mapped file. This method assumes that all values
         * in the iterator are of the same type. This method does NOT take care 
of synchronization. The caller must

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
index 2e9ba2c..b7e8a25 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
@@ -35,8 +36,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends 
PythonStreamer<Pytho
 
        private static final long serialVersionUID = -607175070491761873L;
 
-       public PythonDualInputStreamer(AbstractRichFunction function, int 
envID, int setID, boolean usesByteArray) {
-               super(function, envID, setID, usesByteArray, new 
PythonDualInputSender<IN1, IN2>());
+       public PythonDualInputStreamer(AbstractRichFunction function, 
Configuration config, int envID, int setID, boolean usesByteArray) {
+               super(function, config, envID, setID, usesByteArray, new 
PythonDualInputSender<IN1, IN2>(config));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
index 838a261..c7c1f7a 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -20,8 +20,9 @@ import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonOptions;
 import org.apache.flink.util.Collector;
 
 /**
@@ -30,43 +31,40 @@ import org.apache.flink.util.Collector;
 public class PythonReceiver<OUT> implements Serializable {
        private static final long serialVersionUID = -2474088929850009968L;
 
-       private transient File inputFile;
        private transient RandomAccessFile inputRAF;
        private transient FileChannel inputChannel;
        private transient MappedByteBuffer fileBuffer;
 
+       private final long mappedFileSizeBytes;
+
        private final boolean readAsByteArray;
 
        private transient Deserializer<OUT> deserializer;
 
-       public PythonReceiver(boolean usesByteArray) {
+       public PythonReceiver(Configuration config, boolean usesByteArray) {
                readAsByteArray = usesByteArray;
+               mappedFileSizeBytes = 
config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
        }
 
        
//=====Setup========================================================================================================
 
        @SuppressWarnings("unchecked")
-       public void open(String path) throws IOException {
-               setupMappedFile(path);
+       public void open(File inputFile) throws IOException {
                deserializer = (Deserializer<OUT>) (readAsByteArray ? new 
ByteArrayDeserializer() : new TupleDeserializer());
-       }
 
-       private void setupMappedFile(String inputFilePath) throws IOException {
-               File x = new File(FLINK_TMP_DATA_DIR);
-               x.mkdirs();
+               inputFile.getParentFile().mkdirs();
 
-               inputFile = new File(inputFilePath);
                if (inputFile.exists()) {
                        inputFile.delete();
                }
                inputFile.createNewFile();
-               inputRAF = new RandomAccessFile(inputFilePath, "rw");
-               inputRAF.setLength(MAPPED_FILE_SIZE);
-               inputRAF.seek(MAPPED_FILE_SIZE - 1);
+               inputRAF = new RandomAccessFile(inputFile, "rw");
+               inputRAF.setLength(mappedFileSizeBytes);
+               inputRAF.seek(mappedFileSizeBytes - 1);
                inputRAF.writeByte(0);
                inputRAF.seek(0);
                inputChannel = inputRAF.getChannel();
-               fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
+               fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, mappedFileSizeBytes);
        }
 
        public void close() throws IOException {
@@ -76,7 +74,6 @@ public class PythonReceiver<OUT> implements Serializable {
        private void closeMappedFile() throws IOException {
                inputChannel.close();
                inputRAF.close();
-               inputFile.delete();
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
index 9ada758..3d13271 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -14,6 +14,8 @@ package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonOptions;
 
 import java.io.File;
 import java.io.IOException;
@@ -23,9 +25,6 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
-import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
-
 /**
  * General-purpose class to write data to memory-mapped files.
  */
@@ -37,32 +36,36 @@ public abstract class PythonSender implements Serializable {
        public static final byte TYPE_KEY_VALUE = 62;
        public static final byte TYPE_VALUE_VALUE = 61;
 
-       private transient File outputFile;
        private transient RandomAccessFile outputRAF;
        private transient FileChannel outputChannel;
        private transient MappedByteBuffer fileBuffer;
 
-       
//=====Setup========================================================================================================
-       public void open(String path) throws IOException {
-               setupMappedFile(path);
+       private final long mappedFileSizeBytes;
+       
+       private final Configuration config;
+
+       protected PythonSender(Configuration config) {
+               this.config = config;
+               mappedFileSizeBytes = 
config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10;
        }
 
-       private void setupMappedFile(String outputFilePath) throws IOException {
-               File x = new File(FLINK_TMP_DATA_DIR);
-               x.mkdirs();
+       
//=====Setup========================================================================================================
+       public void open(File outputFile) throws IOException {
+               outputFile.mkdirs();
 
-               outputFile = new File(outputFilePath);
                if (outputFile.exists()) {
                        outputFile.delete();
                }
                outputFile.createNewFile();
-               outputRAF = new RandomAccessFile(outputFilePath, "rw");
-               outputRAF.setLength(MAPPED_FILE_SIZE);
-               outputRAF.seek(MAPPED_FILE_SIZE - 1);
+               outputRAF = new RandomAccessFile(outputFile, "rw");
+
+               
+               outputRAF.setLength(mappedFileSizeBytes);
+               outputRAF.seek(mappedFileSizeBytes - 1);
                outputRAF.writeByte(0);
                outputRAF.seek(0);
                outputChannel = outputRAF.getChannel();
-               fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, MAPPED_FILE_SIZE);
+               fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 
0, mappedFileSizeBytes);
        }
 
        public void close() throws IOException {
@@ -72,7 +75,6 @@ public abstract class PythonSender implements Serializable {
        private void closeMappedFile() throws IOException {
                outputChannel.close();
                outputRAF.close();
-               outputFile.delete();
        }
 
        
//=====IO===========================================================================================================
@@ -92,7 +94,7 @@ public abstract class PythonSender implements Serializable {
                while (input.hasNext()) {
                        IN value = input.next();
                        ByteBuffer bb = serializer.serialize(value);
-                       if (bb.remaining() > MAPPED_FILE_SIZE) {
+                       if (bb.remaining() > mappedFileSizeBytes) {
                                throw new RuntimeException("Serialized object 
does not fit into a single buffer.");
                        }
                        if (bb.remaining() <= fileBuffer.remaining()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
index 42a1799..74d0604 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.python.api.streaming.data;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.io.IOException;
 
 /**
@@ -30,6 +32,10 @@ public class PythonSingleInputSender<IN> extends 
PythonSender {
 
        private transient Serializer<IN> serializer;
 
+       protected PythonSingleInputSender(Configuration config) {
+               super(config);
+       }
+
        /**
         * Extracts records from an iterator and writes them to the 
memory-mapped file. This method assumes that all values
         * in the iterator are of the same type. This method does NOT take care 
of synchronization. The caller must

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
index d013111..e7f018c 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
@@ -18,6 +18,7 @@
 package org.apache.flink.python.api.streaming.data;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
 import java.io.IOException;
@@ -33,8 +34,8 @@ public class PythonSingleInputStreamer<IN, OUT> extends 
PythonStreamer<PythonSin
 
        private static final long serialVersionUID = -5149905918522069034L;
 
-       public PythonSingleInputStreamer(AbstractRichFunction function, int 
envID, int setID, boolean usesByteArray) {
-               super(function, envID, setID, usesByteArray, new 
PythonSingleInputSender<IN>());
+       public PythonSingleInputStreamer(AbstractRichFunction function, 
Configuration config, int envID, int setID, boolean usesByteArray) {
+               super(function, config, envID, setID, usesByteArray, new 
PythonSingleInputSender<IN>(config));
        }
 
        /**
@@ -83,7 +84,9 @@ public class PythonSingleInputStreamer<IN, OUT> extends 
PythonStreamer<PythonSin
                                }
                        }
                } catch (SocketTimeoutException ignored) {
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + msg);
+                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " stopped responding." + 
msg.get());
+               } catch (Exception e) {
+                       throw new RuntimeException("Critical failure. " + 
msg.get(), e);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 006a1b2..97d5780 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -15,7 +15,7 @@ package org.apache.flink.python.api.streaming.data;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.PythonPlanBinder;
+import org.apache.flink.python.api.PythonOptions;
 import 
org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
 import 
org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
@@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
@@ -34,13 +35,11 @@ import java.net.SocketTimeoutException;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
-import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
 import static 
org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT;
 import static 
org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX;
+import static org.apache.flink.python.api.PythonPlanBinder.PLAN_ARGUMENTS_KEY;
 
 /**
  * This streamer is used by functions to send/receive data to/from an external 
python process.
@@ -56,10 +55,9 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
        protected static final int SIGNAL_ERROR = -2;
        protected static final byte SIGNAL_LAST = 32;
 
+       private final Configuration config;
        private final int envID;
        private final int setID;
-       private final boolean usePython3;
-       private final String planArguments;
 
        private transient Process process;
        private transient Thread shutdownThread;
@@ -79,12 +77,11 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
        protected transient Thread outPrinter;
        protected transient Thread errorPrinter;
 
-       public PythonStreamer(AbstractRichFunction function, int envID, int 
setID, boolean usesByteArray, S sender) {
+       public PythonStreamer(AbstractRichFunction function, Configuration 
config, int envID, int setID, boolean usesByteArray, S sender) {
+               this.config = config;
                this.envID = envID;
                this.setID = setID;
-               this.usePython3 = PythonPlanBinder.usePython3;
-               planArguments = PythonPlanBinder.arguments.toString();
-               receiver = new PythonReceiver<>(usesByteArray);
+               this.receiver = new PythonReceiver<>(config, usesByteArray);
                this.function = function;
                this.sender = sender;
        }
@@ -101,16 +98,21 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
        }
 
        private void startPython() throws IOException {
-               String outputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "_" 
+ setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output";
-               String inputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "_" + 
setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input";
+               String tmpDir = config.getString(PythonOptions.DATA_TMP_DIR);
+               if (tmpDir == null) {
+                       tmpDir = System.getProperty("java.io.tmpdir");
+               }
+               File outputFile = new File(tmpDir, envID + "_" + setID + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "_output");
+               File inputFile = new File(tmpDir, envID + "_" + setID + 
this.function.getRuntimeContext().getIndexOfThisSubtask() + "_input)");
 
-               sender.open(inputFilePath);
-               receiver.open(outputFilePath);
+               sender.open(inputFile);
+               receiver.open(outputFile);
 
                String path = 
function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
+
                String planPath = path + FLINK_PYTHON_PLAN_NAME;
 
-               String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+               String pythonBinaryPath = 
config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
                try {
                        Runtime.getRuntime().exec(pythonBinaryPath);
@@ -118,7 +120,7 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
                        throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
                }
 
-               process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B 
" + planPath + planArguments);
+               process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B 
" + planPath + config.getString(PLAN_ARGUMENTS_KEY, ""));
                outPrinter = new Thread(new 
StreamPrinter(process.getInputStream()));
                outPrinter.start();
                errorPrinter = new Thread(new 
StreamPrinter(process.getErrorStream(), msg));
@@ -143,8 +145,9 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
                processOutput.write(("" + server.getLocalPort() + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
                
processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() 
+ "\n")
                        .getBytes(ConfigConstants.DEFAULT_CHARSET));
-               processOutput.write((inputFilePath + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
-               processOutput.write((outputFilePath + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+               
processOutput.write(((config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10) + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+               processOutput.write((inputFile + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
+               processOutput.write((outputFile + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
                processOutput.flush();
 
                while (true) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index 9b62563..9e93dda 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -13,6 +13,8 @@
 package org.apache.flink.python.api.streaming.plan;
 
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.api.PythonOptions;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -22,10 +24,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 
-import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
-import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
-import static org.apache.flink.python.api.PythonPlanBinder.usePython3;
 
 /**
  * Generic class to exchange data during the plan phase.
@@ -33,6 +32,7 @@ import static 
org.apache.flink.python.api.PythonPlanBinder.usePython3;
 public class PythonPlanStreamer {
 
        protected static final Logger LOG = 
LoggerFactory.getLogger(PythonPlanStreamer.class);
+       private final Configuration config;
 
        protected PythonPlanSender sender;
        protected PythonPlanReceiver receiver;
@@ -40,6 +40,10 @@ public class PythonPlanStreamer {
        private Process process;
        private ServerSocket server;
        private Socket socket;
+       
+       public PythonPlanStreamer(Configuration config) {
+               this.config = config;
+       }
 
        public Object getRecord() throws IOException {
                return getRecord(false);
@@ -58,7 +62,7 @@ public class PythonPlanStreamer {
        }
 
        private void startPython(String tmpPath, String args) throws 
IOException {
-               String pythonBinaryPath = usePython3 ? 
FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
+               String pythonBinaryPath = 
config.getString(PythonOptions.PYTHON_BINARY_PATH);
 
                try {
                        Runtime.getRuntime().exec(pythonBinaryPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
index 293f5e9..09e6b36 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py
@@ -23,8 +23,6 @@ import sys
 PY2 = sys.version_info[0] == 2
 PY3 = sys.version_info[0] == 3
 
-MAPPED_FILE_SIZE = 1024 * 1024 * 64
-
 SIGNAL_REQUEST_BUFFER = b"\x00\x00\x00\x00"
 SIGNAL_REQUEST_BUFFER_G0 = b"\xFF\xFF\xFF\xFD"
 SIGNAL_REQUEST_BUFFER_G1 = b"\xFF\xFF\xFF\xFC"
@@ -67,15 +65,16 @@ class PureTCPConnection(object):
 
 
 class BufferingTCPMappedFileConnection(object):
-    def __init__(self, input_file, output_file, port):
+    def __init__(self, input_file, output_file, mmap_size, port):
         self._input_file = open(input_file, "rb+")
         self._output_file = open(output_file, "rb+")
+        self._mmap_size = mmap_size
         if hasattr(mmap, 'MAP_SHARED'):
-            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
-            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
+            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
mmap_size, mmap.MAP_SHARED, mmap.ACCESS_READ)
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
mmap_size, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
         else:
-            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
MAPPED_FILE_SIZE, None, mmap.ACCESS_READ)
-            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE)
+            self._file_input_buffer = mmap.mmap(self._input_file.fileno(), 
mmap_size, None, mmap.ACCESS_READ)
+            self._file_output_buffer = mmap.mmap(self._output_file.fileno(), 
mmap_size, None, mmap.ACCESS_WRITE)
         self._socket = SOCKET.socket(family=SOCKET.AF_INET, 
type=SOCKET.SOCK_STREAM)
         self._socket.connect((SOCKET.gethostbyname("localhost"), port))
 
@@ -92,10 +91,10 @@ class BufferingTCPMappedFileConnection(object):
 
     def write(self, msg):
         length = len(msg)
-        if length > MAPPED_FILE_SIZE:
+        if length > self._mmap_size:
             raise Exception("Serialized object does not fit into a single 
buffer.")
         tmp = self._out_size + length
-        if tmp > MAPPED_FILE_SIZE:
+        if tmp > self._mmap_size:
             self._write_buffer()
             self.write(msg)
         else:
@@ -150,8 +149,8 @@ class BufferingTCPMappedFileConnection(object):
 
 
 class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
-    def __init__(self, input_file, output_file, port):
-        super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, 
output_file, port)
+    def __init__(self, input_file, output_file, mmap_size, port):
+        super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, 
output_file, mmap_size, port)
         self._input = [b"", b""]
         self._input_offset = [0, 0]
         self._input_size = [0, 0]

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
index 83f563b..edd2c61 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py
@@ -25,8 +25,8 @@ class CoGroupFunction(Function.Function):
         self._keys1 = None
         self._keys2 = None
 
-    def _configure(self, input_file, output_file, port, env, info, 
subtask_index):
-        self._connection = 
Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port)
+    def _configure(self, input_file, output_file, mmap_size, port, env, info, 
subtask_index):
+        self._connection = 
Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, 
mmap_size, port)
         self._iterator = Iterator.Iterator(self._connection, env, 0)
         self._iterator2 = Iterator.Iterator(self._connection, env, 1)
         self._cgiter = Iterator.CoGroupIterator(self._iterator, 
self._iterator2, self._keys1, self._keys2)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
index 45a0f2e..a70a359 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py
@@ -32,8 +32,8 @@ class Function(object):
         self.context = None
         self._env = None
 
-    def _configure(self, input_file, output_file, port, env, info, 
subtask_index):
-        self._connection = 
Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
+    def _configure(self, input_file, output_file, mmap_size, port, env, info, 
subtask_index):
+        self._connection = 
Connection.BufferingTCPMappedFileConnection(input_file, output_file, mmap_size, 
port)
         self._iterator = Iterator.Iterator(self._connection, env)
         self._collector = Collector.Collector(self._connection, env, info)
         self.context = RuntimeContext.RuntimeContext(self._iterator, 
self._collector, subtask_index)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
index 77b53a2..5a353a1 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py
@@ -25,8 +25,8 @@ class GroupReduceFunction(Function.Function):
     def __init__(self):
         super(GroupReduceFunction, self).__init__()
 
-    def _configure(self, input_file, output_file, port, env, info, 
subtask_index):
-        super(GroupReduceFunction, self)._configure(input_file, output_file, 
port, env, info, subtask_index)
+    def _configure(self, input_file, output_file, mmap_size, port, env, info, 
subtask_index):
+        super(GroupReduceFunction, self)._configure(input_file, output_file, 
mmap_size, port, env, info, subtask_index)
         if len(info.key1) == 0:
             self._run = self._run_all_group_reduce
         else:

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
index 08af276..5acabe0 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py
@@ -24,8 +24,8 @@ class ReduceFunction(Function.Function):
     def __init__(self):
         super(ReduceFunction, self).__init__()
 
-    def _configure(self, input_file, output_file, port, env, info, 
subtask_index):
-        super(ReduceFunction, self)._configure(input_file, output_file, port, 
env, info, subtask_index)
+    def _configure(self, input_file, output_file, mmap_size, port, env, info, 
subtask_index):
+        super(ReduceFunction, self)._configure(input_file, output_file, 
mmap_size, port, env, info, subtask_index)
         if len(info.key1) == 0:
             self._run = self._run_all_reduce
         else:

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index 6e496de..797ae96 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -234,6 +234,7 @@ class Environment(object):
 
                     port = int(sys.stdin.readline().rstrip('\n'))
                     subtask_index = int(sys.stdin.readline().rstrip('\n'))
+                    mmap_size = int(sys.stdin.readline().rstrip('\n'))
                     input_path = sys.stdin.readline().rstrip('\n')
                     output_path = sys.stdin.readline().rstrip('\n')
 
@@ -244,7 +245,7 @@ class Environment(object):
                         if set.id == id:
                             used_set = set
                             operator = set.operator
-                    operator._configure(input_path, output_path, port, self, 
used_set, subtask_index)
+                    operator._configure(input_path, output_path, mmap_size, 
port, self, used_set, subtask_index)
                     operator._go()
                     operator._close()
                     sys.stdout.flush()
@@ -252,7 +253,7 @@ class Environment(object):
             except:
                 sys.stdout.flush()
                 sys.stderr.flush()
-                if operator is not None:
+                if operator is not None and operator._connection is not None:
                     operator._connection._socket.send(struct.pack(">i", -2))
                 elif port is not None:
                     socket = SOCKET.socket(family=SOCKET.AF_INET, 
type=SOCKET.SOCK_STREAM)

http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
 
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
index d144298..ba8ea78 100644
--- 
a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
+++ 
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java
@@ -12,6 +12,7 @@
  */
 package org.apache.flink.python.api;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -21,9 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_2;
-import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_3;
-
 public class PythonPlanBinderTest extends JavaProgramTestBase {
        
        @Override
@@ -75,12 +73,16 @@ public class PythonPlanBinderTest extends 
JavaProgramTestBase {
                String utils = findUtilsFile();
                if (isPython2Supported()) {
                        for (String file : findTestFiles()) {
-                               PythonPlanBinder.main(new 
String[]{ARGUMENT_PYTHON_2, file, utils});
+                               Configuration configuration = new 
Configuration();
+                               
config.setString(PythonOptions.PYTHON_BINARY_PATH, "python");
+                               new PythonPlanBinder(configuration).runPlan(new 
String[]{file, utils});
                        }
                }
                if (isPython3Supported()) {
                        for (String file : findTestFiles()) {
-                               PythonPlanBinder.main(new 
String[]{ARGUMENT_PYTHON_3, file, utils});
+                               Configuration configuration = new 
Configuration();
+                               
config.setString(PythonOptions.PYTHON_BINARY_PATH, "python3");
+                               new PythonPlanBinder(configuration).runPlan(new 
String[]{file, utils});
                        }
                }
        }

Reply via email to