[FLINK-6229] [py] Rework configuration of PythonPlanBinder/Operators - unify python2/python3 configuration - explicitly pass on a configuration to each operator - port all configuration options to ConfigOptions - [FLINK-5516] Make all paths explicitly configurable - [FLINK-6230] Make mmap size configurable
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bdcebfda Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bdcebfda Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bdcebfda Branch: refs/heads/table-retraction Commit: bdcebfda06846a1e21bb6a4678909d503ebc6333 Parents: 940d16c Author: zentol <ches...@apache.org> Authored: Fri Mar 31 11:55:18 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Thu Apr 6 10:57:11 2017 +0200 ---------------------------------------------------------------------- docs/dev/batch/python.md | 10 +- flink-dist/src/main/flink-bin/bin/pyflink.bat | 25 ++++ flink-dist/src/main/flink-bin/bin/pyflink.sh | 25 ++++ flink-dist/src/main/flink-bin/bin/pyflink2.bat | 25 ---- flink-dist/src/main/flink-bin/bin/pyflink2.sh | 25 ---- flink-dist/src/main/flink-bin/bin/pyflink3.bat | 25 ---- flink-dist/src/main/flink-bin/bin/pyflink3.sh | 26 ---- .../apache/flink/python/api/PythonOptions.java | 74 ++++++++++ .../flink/python/api/PythonPlanBinder.java | 143 +++++++++++-------- .../python/api/functions/PythonCoGroup.java | 4 +- .../api/functions/PythonMapPartition.java | 4 +- .../streaming/data/PythonDualInputSender.java | 6 + .../streaming/data/PythonDualInputStreamer.java | 5 +- .../api/streaming/data/PythonReceiver.java | 29 ++-- .../python/api/streaming/data/PythonSender.java | 36 ++--- .../streaming/data/PythonSingleInputSender.java | 6 + .../data/PythonSingleInputStreamer.java | 9 +- .../api/streaming/data/PythonStreamer.java | 39 ++--- .../api/streaming/plan/PythonPlanStreamer.java | 12 +- .../python/api/flink/connection/Connection.py | 21 ++- .../api/flink/functions/CoGroupFunction.py | 4 +- .../python/api/flink/functions/Function.py | 4 +- .../api/flink/functions/GroupReduceFunction.py | 4 +- .../api/flink/functions/ReduceFunction.py | 4 +- .../flink/python/api/flink/plan/Environment.py | 5 +- .../flink/python/api/PythonPlanBinderTest.java | 12 +- 26 files changed, 322 insertions(+), 260 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/docs/dev/batch/python.md ---------------------------------------------------------------------- diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md index 09a4fa8..c4c2671 100644 --- a/docs/dev/batch/python.md +++ b/docs/dev/batch/python.md @@ -149,8 +149,7 @@ Apart from setting up Flink, no additional work is required. The python package The Python API was tested on Linux/Windows systems that have Python 2.7 or 3.4 installed. -By default Flink will start python processes by calling "python" or "python3", depending on which start-script -was used. By setting the "python.binary.python[2/3]" key in the flink-conf.yaml you can modify this behaviour to use a binary of your choice. +By default Flink will start python processes by calling "python". By setting the "python.binary.path" key in the flink-conf.yaml you can modify this behaviour to use a binary of your choice. {% top %} @@ -624,12 +623,11 @@ Executing Plans --------------- To run the plan with Flink, go to your Flink distribution, and run the pyflink.sh script from the /bin folder. -use pyflink2.sh for python 2.7, and pyflink3.sh for python 3.4. The script containing the plan has to be passed -as the first argument, followed by a number of additional python packages, and finally, separated by - additional -arguments that will be fed to the script. +The script containing the plan has to be passed as the first argument, followed by a number of additional python +packages, and finally, separated by - additional arguments that will be fed to the script. {% highlight python %} -./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]] +./bin/pyflink.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]] {% endhighlight %} {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink.bat ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink.bat b/flink-dist/src/main/flink-bin/bin/pyflink.bat new file mode 100644 index 0000000..da180f3 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/pyflink.bat @@ -0,0 +1,25 @@ +::############################################################################### +:: Licensed to the Apache Software Foundation (ASF) under one +:: or more contributor license agreements. See the NOTICE file +:: distributed with this work for additional information +:: regarding copyright ownership. The ASF licenses this file +:: to you under the Apache License, Version 2.0 (the +:: "License"); you may not use this file except in compliance +:: with the License. You may obtain a copy of the License at +:: +:: http://www.apache.org/licenses/LICENSE-2.0 +:: +:: Unless required by applicable law or agreed to in writing, software +:: distributed under the License is distributed on an "AS IS" BASIS, +:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +:: See the License for the specific language governing permissions and +:: limitations under the License. +::############################################################################### + +@echo off +setlocal EnableDelayedExpansion + +SET bin=%~dp0 +SET FLINK_ROOT_DIR=%bin%.. + +"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar %* \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink.sh b/flink-dist/src/main/flink-bin/bin/pyflink.sh new file mode 100644 index 0000000..37679d3 --- /dev/null +++ b/flink-dist/src/main/flink-bin/bin/pyflink.sh @@ -0,0 +1,25 @@ +#!/bin/bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "$@" http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink2.bat ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.bat b/flink-dist/src/main/flink-bin/bin/pyflink2.bat deleted file mode 100644 index 1122ed4..0000000 --- a/flink-dist/src/main/flink-bin/bin/pyflink2.bat +++ /dev/null @@ -1,25 +0,0 @@ -::############################################################################### -:: Licensed to the Apache Software Foundation (ASF) under one -:: or more contributor license agreements. See the NOTICE file -:: distributed with this work for additional information -:: regarding copyright ownership. The ASF licenses this file -:: to you under the Apache License, Version 2.0 (the -:: "License"); you may not use this file except in compliance -:: with the License. You may obtain a copy of the License at -:: -:: http://www.apache.org/licenses/LICENSE-2.0 -:: -:: Unless required by applicable law or agreed to in writing, software -:: distributed under the License is distributed on an "AS IS" BASIS, -:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -:: See the License for the specific language governing permissions and -:: limitations under the License. -::############################################################################### - -@echo off -setlocal EnableDelayedExpansion - -SET bin=%~dp0 -SET FLINK_ROOT_DIR=%bin%.. - -"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar 2 %* \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink2.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink2.sh b/flink-dist/src/main/flink-bin/bin/pyflink2.sh deleted file mode 100644 index 8a326e9..0000000 --- a/flink-dist/src/main/flink-bin/bin/pyflink2.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -bin=`dirname "$0"` -bin=`cd "$bin"; pwd` - -. "$bin"/config.sh - -"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "2" "$@" http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink3.bat ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.bat b/flink-dist/src/main/flink-bin/bin/pyflink3.bat deleted file mode 100644 index 0294e37..0000000 --- a/flink-dist/src/main/flink-bin/bin/pyflink3.bat +++ /dev/null @@ -1,25 +0,0 @@ -::############################################################################### -:: Licensed to the Apache Software Foundation (ASF) under one -:: or more contributor license agreements. See the NOTICE file -:: distributed with this work for additional information -:: regarding copyright ownership. The ASF licenses this file -:: to you under the Apache License, Version 2.0 (the -:: "License"); you may not use this file except in compliance -:: with the License. You may obtain a copy of the License at -:: -:: http://www.apache.org/licenses/LICENSE-2.0 -:: -:: Unless required by applicable law or agreed to in writing, software -:: distributed under the License is distributed on an "AS IS" BASIS, -:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -:: See the License for the specific language governing permissions and -:: limitations under the License. -::############################################################################### - -@echo off -setlocal EnableDelayedExpansion - -SET bin=%~dp0 -SET FLINK_ROOT_DIR=%bin%.. - -"%FLINK_ROOT_DIR%\bin\flink" run -v "%FLINK_ROOT_DIR%"\lib\flink-python*.jar 3 %* \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-dist/src/main/flink-bin/bin/pyflink3.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/pyflink3.sh b/flink-dist/src/main/flink-bin/bin/pyflink3.sh deleted file mode 100644 index d2d1991..0000000 --- a/flink-dist/src/main/flink-bin/bin/pyflink3.sh +++ /dev/null @@ -1,26 +0,0 @@ -#!/bin/bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -bin=`dirname "$0"` -bin=`cd "$bin"; pwd` - -. "$bin"/config.sh - - -"$FLINK_BIN_DIR"/flink run -v "$FLINK_ROOT_DIR"/lib/flink-python*.jar "3" "$@" http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java new file mode 100644 index 0000000..de053a0 --- /dev/null +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOptions.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.python.api; + +import org.apache.flink.configuration.ConfigOption; + +import java.io.File; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * Configuration options for the Python API. + */ +public class PythonOptions { + + /** + * The config parameter defining the path to the python binary to use. + */ + public static final ConfigOption<String> PYTHON_BINARY_PATH = + key("python.binary.path") + .defaultValue("python") + .withDeprecatedKeys("python.binary.python2", "python.binary.python3"); + + /** + * The config parameter defining the size of the memory-mapped files, in kb. + * This value must be large enough to ensure that the largest serialized record can be written completely into + * the file. + * + * Every task will allocate 2 memory-files, each with this size. + */ + public static final ConfigOption<Long> MMAP_FILE_SIZE = + key("python.mmap.size.kb") + .defaultValue(4L); + + /** + * The config parameter defining where temporary plan-related files are stored on the client. + */ + public static final ConfigOption<String> PLAN_TMP_DIR = + key("python.plan.tmp.dir") + .noDefaultValue(); + + /** + * The config parameter defining where the memory-mapped files will be created. + */ + public static final ConfigOption<String> DATA_TMP_DIR = + key("python.mmap.tmp.dir") + .noDefaultValue(); + + /** + * The config parameter defining where the flink python library and user supplied files will be uploaded to before + * registering them with the Distributed Cache. This directory must be accessible from all worker nodes. + */ + public static final ConfigOption<String> DC_TMP_DIR = + key("python.dc.tmp.dir") + .defaultValue(System.getProperty("java.io.tmpdir") + File.separator + "flink_dc"); + + private PythonOptions() { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 733a6fb..b6181b4 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -18,7 +18,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.LocalEnvironment; import org.apache.flink.api.java.io.PrintingOutputFormat; import org.apache.flink.api.java.io.TupleCsvInputFormat; import org.apache.flink.api.java.operators.CoGroupRawOperator; @@ -50,7 +49,6 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Random; @@ -65,42 +63,30 @@ import static org.apache.flink.python.api.PythonOperationInfo.DatasizeHint.TINY; public class PythonPlanBinder { static final Logger LOG = LoggerFactory.getLogger(PythonPlanBinder.class); - public static final String ARGUMENT_PYTHON_2 = "2"; - public static final String ARGUMENT_PYTHON_3 = "3"; - public static final String FLINK_PYTHON_DC_ID = "flink"; public static final String FLINK_PYTHON_PLAN_NAME = File.separator + "plan.py"; - public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2"; - public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3"; public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT"; public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_"; - public static String FLINK_PYTHON2_BINARY_PATH = - GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON2_BINARY_KEY, "python"); - public static String FLINK_PYTHON3_BINARY_PATH = - GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON3_BINARY_KEY, "python3"); private static final Random r = new Random(); - public static final String FLINK_PYTHON_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "flink_plan"; + public static final String PLAN_ARGUMENTS_KEY = "python.plan.arguments"; + private static final String FLINK_PYTHON_REL_LOCAL_PATH = File.separator + "resources" + File.separator + "python"; - private static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR"); - private static String FULL_PATH; - public static StringBuilder arguments = new StringBuilder(); + private final Configuration operatorConfig; - public static boolean usePython3 = false; + private final String pythonLibraryPath; - private static String FLINK_HDFS_PATH = "hdfs:/tmp"; - public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; + private final String tmpPlanFilesDir; + private String tmpDistributedDir; private final SetCache sets = new SetCache(); public ExecutionEnvironment env; private int currentEnvironmentID = 0; private PythonPlanStreamer streamer; - public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64; - /** * Entry point for the execution of a python plan. * @@ -112,23 +98,37 @@ public class PythonPlanBinder { System.out.println("Usage: ./bin/pyflink<2/3>.[sh/bat] <pathToScript>[ <pathToPackage1>[ <pathToPackageX]][ - <parameter1>[ <parameterX>]]"); return; } - usePython3 = args[0].equals(ARGUMENT_PYTHON_3); - PythonPlanBinder binder = new PythonPlanBinder(); - binder.runPlan(Arrays.copyOfRange(args, 1, args.length)); - } - public PythonPlanBinder() { - Configuration conf = GlobalConfiguration.loadConfiguration(); - FLINK_PYTHON2_BINARY_PATH = conf.getString(FLINK_PYTHON2_BINARY_KEY, "python"); - FLINK_PYTHON3_BINARY_PATH = conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3"); - FULL_PATH = FLINK_DIR != null + Configuration globalConfig = GlobalConfiguration.loadConfiguration(); + PythonPlanBinder binder = new PythonPlanBinder(globalConfig); + binder.runPlan(args); + } + + public PythonPlanBinder(Configuration globalConfig) { + String configuredPlanTmpPath = globalConfig.getString(PythonOptions.PLAN_TMP_DIR); + tmpPlanFilesDir = configuredPlanTmpPath != null + ? configuredPlanTmpPath + : System.getProperty("java.io.tmpdir") + File.separator + "flink_plan_" + r.nextInt(); + + tmpDistributedDir = globalConfig.getString(PythonOptions.DC_TMP_DIR); + + String flinkRootDir = System.getenv("FLINK_ROOT_DIR"); + pythonLibraryPath = flinkRootDir != null //command-line - ? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH + ? flinkRootDir + FLINK_PYTHON_REL_LOCAL_PATH //testing - : new Path(FileSystem.getLocalFileSystem().getWorkingDirectory(), "src/main/python/org/apache/flink/python/api").toString(); + : new File(System.getProperty("user.dir"), "src/main/python/org/apache/flink/python/api").getAbsolutePath(); + + operatorConfig = new Configuration(); + operatorConfig.setString(PythonOptions.PYTHON_BINARY_PATH, globalConfig.getString(PythonOptions.PYTHON_BINARY_PATH)); + String configuredTmpDataDir = globalConfig.getString(PythonOptions.DATA_TMP_DIR); + if (configuredTmpDataDir != null) { + operatorConfig.setString(PythonOptions.DATA_TMP_DIR, configuredTmpDataDir); + } + operatorConfig.setLong(PythonOptions.MMAP_FILE_SIZE, globalConfig.getLong(PythonOptions.MMAP_FILE_SIZE)); } - private void runPlan(String[] args) throws Exception { + void runPlan(String[] args) throws Exception { int split = 0; for (int x = 0; x < args.length; x++) { if (args[x].compareTo("-") == 0) { @@ -137,7 +137,7 @@ public class PythonPlanBinder { } try { - String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt(); + String tmpPath = tmpPlanFilesDir; prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split)); startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); @@ -145,10 +145,6 @@ public class PythonPlanBinder { while (streamer.preparePlanMode()) { receivePlan(); - if (env instanceof LocalEnvironment) { - FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; - } - distributeFiles(tmpPath, env); JobExecutionResult jer = env.execute(); sendResult(jer); @@ -167,8 +163,8 @@ public class PythonPlanBinder { //=====Setup======================================================================================================== /** - * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute it as one big - * package, and resolves PYTHONPATH issues. + * Copies all files to a common directory {@link PythonOptions#PLAN_TMP_DIR}). This allows us to distribute it as + * one big package which resolves PYTHONPATH issues. * * @param filePaths * @throws IOException @@ -177,7 +173,7 @@ public class PythonPlanBinder { private void prepareFiles(String tempFilePath, String... filePaths) throws IOException, URISyntaxException { //Flink python package clearPath(tempFilePath); - FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false); + FileCache.copy(new Path(pythonLibraryPath), new Path(tmpPlanFilesDir), false); //plan file copyFile(filePaths[0], tempFilePath, FLINK_PYTHON_PLAN_NAME); @@ -206,17 +202,21 @@ public class PythonPlanBinder { FileCache.copy(p.makeQualified(FileSystem.get(p.toUri())), new Path(tmpFilePath), true); } - private static void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException, URISyntaxException { - clearPath(FLINK_HDFS_PATH); - FileCache.copy(new Path(tmpPath), new Path(FLINK_HDFS_PATH), true); - env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID); + private void distributeFiles(String tmpPath, ExecutionEnvironment env) throws IOException { + clearPath(tmpDistributedDir); + FileCache.copy(new Path(tmpPath), new Path(tmpDistributedDir), true); + env.registerCachedFile(new Path(tmpDistributedDir).toUri().toString(), FLINK_PYTHON_DC_ID); } private void startPython(String tempPath, String[] args) throws IOException { + StringBuilder arguments = new StringBuilder(); for (String arg : args) { arguments.append(" ").append(arg); } - streamer = new PythonPlanStreamer(); + + operatorConfig.setString(PLAN_ARGUMENTS_KEY, arguments.toString()); + + streamer = new PythonPlanStreamer(operatorConfig); streamer.open(tempPath, arguments.toString()); } @@ -227,17 +227,15 @@ public class PythonPlanBinder { private void close() { try { //prevent throwing exception so that previous exceptions aren't hidden. - FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH)); - hdfs.delete(new Path(FLINK_HDFS_PATH), true); + FileSystem hdfs = new Path(tmpDistributedDir).getFileSystem(); + hdfs.delete(new Path(tmpDistributedDir), true); FileSystem local = FileSystem.getLocalFileSystem(); - local.delete(new Path(FLINK_PYTHON_FILE_PATH), true); - local.delete(new Path(FLINK_TMP_DATA_DIR), true); + local.delete(new Path(tmpPlanFilesDir), true); streamer.close(); } catch (NullPointerException ignored) { } catch (IOException ioe) { LOG.error("PythonAPI file cleanup failed. {}", ioe.getMessage()); - } catch (URISyntaxException use) { // can't occur } } @@ -271,7 +269,10 @@ public class PythonPlanBinder { env.setParallelism(dop); break; case MODE: - FLINK_HDFS_PATH = value.<Boolean>getField(1) ? "file:/tmp/flink" : "hdfs:/tmp/flink"; + if (value.<Boolean>getField(1)) { + LOG.info("Local execution specified, using default for {}.", PythonOptions.DC_TMP_DIR); + tmpDistributedDir = PythonOptions.DC_TMP_DIR.defaultValue(); + } break; case RETRY: int retry = value.<Integer>getField(1); @@ -527,7 +528,7 @@ public class PythonPlanBinder { DataSet<IN2> op2 = sets.getDataSet(info.otherID); Keys.ExpressionKeys<IN1> key1 = new Keys.ExpressionKeys<>(info.keys1, op1.getType()); Keys.ExpressionKeys<IN2> key2 = new Keys.ExpressionKeys<>(info.keys2, op2.getType()); - PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(info.envID, info.setID, type); + PythonCoGroup<IN1, IN2, OUT> pcg = new PythonCoGroup<>(operatorConfig, info.envID, info.setID, type); sets.add(info.setID, new CoGroupRawOperator<>(op1, op2, key1, key2, pcg, type, info.name).setParallelism(getParallelism(info))); } @@ -552,7 +553,9 @@ public class PythonPlanBinder { defaultResult.setParallelism(getParallelism(info)); if (info.usesUDF) { - sets.add(info.setID, defaultResult.mapPartition(new PythonMapPartition<Tuple2<IN1, IN2>, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name)); + sets.add(info.setID, defaultResult + .mapPartition(new PythonMapPartition<Tuple2<IN1, IN2>, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name)); } else { sets.add(info.setID, defaultResult.name("DefaultCross")); } @@ -560,12 +563,16 @@ public class PythonPlanBinder { private <IN, OUT> void createFilterOperation(PythonOperationInfo info, TypeInformation<OUT> type) { DataSet<IN> op1 = sets.getDataSet(info.parentID); - sets.add(info.setID, op1.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name)); + sets.add(info.setID, op1 + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name)); } private <IN, OUT> void createFlatMapOperation(PythonOperationInfo info, TypeInformation<OUT> type) { DataSet<IN> op1 = sets.getDataSet(info.parentID); - sets.add(info.setID, op1.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name)); + sets.add(info.setID, op1 + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name)); } private void createGroupReduceOperation(PythonOperationInfo info) { @@ -581,19 +588,22 @@ public class PythonPlanBinder { private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info)) - .mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name); } private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name); } private <IN, OUT> DataSet<OUT> applyGroupReduceOperation(SortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name); } private <IN1, IN2, OUT> void createJoinOperation(DatasizeHint mode, PythonOperationInfo info, TypeInformation<OUT> type) { @@ -602,7 +612,8 @@ public class PythonPlanBinder { if (info.usesUDF) { sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info)) - .mapPartition(new PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name)); + .mapPartition(new PythonMapPartition<Tuple2<byte[], byte[]>, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name)); } else { sets.add(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info))); } @@ -629,12 +640,16 @@ public class PythonPlanBinder { private <IN, OUT> void createMapOperation(PythonOperationInfo info, TypeInformation<OUT> type) { DataSet<IN> op1 = sets.getDataSet(info.parentID); - sets.add(info.setID, op1.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name)); + sets.add(info.setID, op1 + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name)); } private <IN, OUT> void createMapPartitionOperation(PythonOperationInfo info, TypeInformation<OUT> type) { DataSet<IN> op1 = sets.getDataSet(info.parentID); - sets.add(info.setID, op1.mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name)); + sets.add(info.setID, op1 + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name)); } private void createReduceOperation(PythonOperationInfo info) { @@ -650,12 +665,14 @@ public class PythonPlanBinder { private <IN, OUT> DataSet<OUT> applyReduceOperation(DataSet<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep") - .mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name); } private <IN, OUT> DataSet<OUT> applyReduceOperation(UnsortedGrouping<IN> op1, PythonOperationInfo info, TypeInformation<OUT> type) { return op1 .reduceGroup(new IdentityGroupReduce<IN>()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep") - .mapPartition(new PythonMapPartition<IN, OUT>(info.envID, info.setID, type)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition<IN, OUT>(operatorConfig, info.envID, info.setID, type)) + .setParallelism(getParallelism(info)).name(info.name); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java index ff5a8d4..a5e3e75 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java @@ -35,9 +35,9 @@ public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, private final PythonDualInputStreamer<IN1, IN2, OUT> streamer; private final transient TypeInformation<OUT> typeInformation; - public PythonCoGroup(int envID, int setID, TypeInformation<OUT> typeInformation) { + public PythonCoGroup(Configuration config, int envID, int setID, TypeInformation<OUT> typeInformation) { this.typeInformation = typeInformation; - streamer = new PythonDualInputStreamer<>(this, envID, setID, typeInformation instanceof PrimitiveArrayTypeInfo); + streamer = new PythonDualInputStreamer<>(this, config, envID, setID, typeInformation instanceof PrimitiveArrayTypeInfo); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java index 9142581..207ead9 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java @@ -35,9 +35,9 @@ public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OU private final PythonSingleInputStreamer<IN, OUT> streamer; private final transient TypeInformation<OUT> typeInformation; - public PythonMapPartition(int envId, int setId, TypeInformation<OUT> typeInformation) { + public PythonMapPartition(Configuration config, int envId, int setId, TypeInformation<OUT> typeInformation) { this.typeInformation = typeInformation; - streamer = new PythonSingleInputStreamer<>(this, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo); + streamer = new PythonSingleInputStreamer<>(this, config, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java index 3b8e423..a16f522 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java @@ -17,6 +17,8 @@ */ package org.apache.flink.python.api.streaming.data; +import org.apache.flink.configuration.Configuration; + import java.io.IOException; /** @@ -32,6 +34,10 @@ public class PythonDualInputSender<IN1, IN2> extends PythonSender { private transient Serializer<IN1> serializer1; private transient Serializer<IN2> serializer2; + protected PythonDualInputSender(Configuration config) { + super(config); + } + /** * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java index 2e9ba2c..b7e8a25 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java @@ -18,6 +18,7 @@ package org.apache.flink.python.api.streaming.data; import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import java.io.IOException; @@ -35,8 +36,8 @@ public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<Pytho private static final long serialVersionUID = -607175070491761873L; - public PythonDualInputStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray) { - super(function, envID, setID, usesByteArray, new PythonDualInputSender<IN1, IN2>()); + public PythonDualInputStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray) { + super(function, config, envID, setID, usesByteArray, new PythonDualInputSender<IN1, IN2>(config)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java index 838a261..c7c1f7a 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java @@ -20,8 +20,9 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; -import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.python.api.PythonOptions; import org.apache.flink.util.Collector; /** @@ -30,43 +31,40 @@ import org.apache.flink.util.Collector; public class PythonReceiver<OUT> implements Serializable { private static final long serialVersionUID = -2474088929850009968L; - private transient File inputFile; private transient RandomAccessFile inputRAF; private transient FileChannel inputChannel; private transient MappedByteBuffer fileBuffer; + private final long mappedFileSizeBytes; + private final boolean readAsByteArray; private transient Deserializer<OUT> deserializer; - public PythonReceiver(boolean usesByteArray) { + public PythonReceiver(Configuration config, boolean usesByteArray) { readAsByteArray = usesByteArray; + mappedFileSizeBytes = config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10; } //=====Setup======================================================================================================== @SuppressWarnings("unchecked") - public void open(String path) throws IOException { - setupMappedFile(path); + public void open(File inputFile) throws IOException { deserializer = (Deserializer<OUT>) (readAsByteArray ? new ByteArrayDeserializer() : new TupleDeserializer()); - } - private void setupMappedFile(String inputFilePath) throws IOException { - File x = new File(FLINK_TMP_DATA_DIR); - x.mkdirs(); + inputFile.getParentFile().mkdirs(); - inputFile = new File(inputFilePath); if (inputFile.exists()) { inputFile.delete(); } inputFile.createNewFile(); - inputRAF = new RandomAccessFile(inputFilePath, "rw"); - inputRAF.setLength(MAPPED_FILE_SIZE); - inputRAF.seek(MAPPED_FILE_SIZE - 1); + inputRAF = new RandomAccessFile(inputFile, "rw"); + inputRAF.setLength(mappedFileSizeBytes); + inputRAF.seek(mappedFileSizeBytes - 1); inputRAF.writeByte(0); inputRAF.seek(0); inputChannel = inputRAF.getChannel(); - fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); + fileBuffer = inputChannel.map(FileChannel.MapMode.READ_WRITE, 0, mappedFileSizeBytes); } public void close() throws IOException { @@ -76,7 +74,6 @@ public class PythonReceiver<OUT> implements Serializable { private void closeMappedFile() throws IOException { inputChannel.close(); inputRAF.close(); - inputFile.delete(); } http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java index 9ada758..3d13271 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java @@ -14,6 +14,8 @@ package org.apache.flink.python.api.streaming.data; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.python.api.PythonOptions; import java.io.File; import java.io.IOException; @@ -23,9 +25,6 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; -import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE; - /** * General-purpose class to write data to memory-mapped files. */ @@ -37,32 +36,36 @@ public abstract class PythonSender implements Serializable { public static final byte TYPE_KEY_VALUE = 62; public static final byte TYPE_VALUE_VALUE = 61; - private transient File outputFile; private transient RandomAccessFile outputRAF; private transient FileChannel outputChannel; private transient MappedByteBuffer fileBuffer; - //=====Setup======================================================================================================== - public void open(String path) throws IOException { - setupMappedFile(path); + private final long mappedFileSizeBytes; + + private final Configuration config; + + protected PythonSender(Configuration config) { + this.config = config; + mappedFileSizeBytes = config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10; } - private void setupMappedFile(String outputFilePath) throws IOException { - File x = new File(FLINK_TMP_DATA_DIR); - x.mkdirs(); + //=====Setup======================================================================================================== + public void open(File outputFile) throws IOException { + outputFile.mkdirs(); - outputFile = new File(outputFilePath); if (outputFile.exists()) { outputFile.delete(); } outputFile.createNewFile(); - outputRAF = new RandomAccessFile(outputFilePath, "rw"); - outputRAF.setLength(MAPPED_FILE_SIZE); - outputRAF.seek(MAPPED_FILE_SIZE - 1); + outputRAF = new RandomAccessFile(outputFile, "rw"); + + + outputRAF.setLength(mappedFileSizeBytes); + outputRAF.seek(mappedFileSizeBytes - 1); outputRAF.writeByte(0); outputRAF.seek(0); outputChannel = outputRAF.getChannel(); - fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, MAPPED_FILE_SIZE); + fileBuffer = outputChannel.map(FileChannel.MapMode.READ_WRITE, 0, mappedFileSizeBytes); } public void close() throws IOException { @@ -72,7 +75,6 @@ public abstract class PythonSender implements Serializable { private void closeMappedFile() throws IOException { outputChannel.close(); outputRAF.close(); - outputFile.delete(); } //=====IO=========================================================================================================== @@ -92,7 +94,7 @@ public abstract class PythonSender implements Serializable { while (input.hasNext()) { IN value = input.next(); ByteBuffer bb = serializer.serialize(value); - if (bb.remaining() > MAPPED_FILE_SIZE) { + if (bb.remaining() > mappedFileSizeBytes) { throw new RuntimeException("Serialized object does not fit into a single buffer."); } if (bb.remaining() <= fileBuffer.remaining()) { http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java index 42a1799..74d0604 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java @@ -17,6 +17,8 @@ */ package org.apache.flink.python.api.streaming.data; +import org.apache.flink.configuration.Configuration; + import java.io.IOException; /** @@ -30,6 +32,10 @@ public class PythonSingleInputSender<IN> extends PythonSender { private transient Serializer<IN> serializer; + protected PythonSingleInputSender(Configuration config) { + super(config); + } + /** * Extracts records from an iterator and writes them to the memory-mapped file. This method assumes that all values * in the iterator are of the same type. This method does NOT take care of synchronization. The caller must http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java index d013111..e7f018c 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java @@ -18,6 +18,7 @@ package org.apache.flink.python.api.streaming.data; import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; import java.io.IOException; @@ -33,8 +34,8 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin private static final long serialVersionUID = -5149905918522069034L; - public PythonSingleInputStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray) { - super(function, envID, setID, usesByteArray, new PythonSingleInputSender<IN>()); + public PythonSingleInputStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray) { + super(function, config, envID, setID, usesByteArray, new PythonSingleInputSender<IN>(config)); } /** @@ -83,7 +84,9 @@ public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSin } } } catch (SocketTimeoutException ignored) { - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg); + throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " stopped responding." + msg.get()); + } catch (Exception e) { + throw new RuntimeException("Critical failure. " + msg.get(), e); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 006a1b2..97d5780 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -15,7 +15,7 @@ package org.apache.flink.python.api.streaming.data; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.python.api.PythonPlanBinder; +import org.apache.flink.python.api.PythonOptions; import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer; import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer; import org.apache.flink.python.api.streaming.util.StreamPrinter; @@ -24,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.io.Serializable; @@ -34,13 +35,11 @@ import java.net.SocketTimeoutException; import java.util.Iterator; import java.util.concurrent.atomic.AtomicReference; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH; import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID; import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR; import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_COUNT; import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCVAR_NAME_PREFIX; +import static org.apache.flink.python.api.PythonPlanBinder.PLAN_ARGUMENTS_KEY; /** * This streamer is used by functions to send/receive data to/from an external python process. @@ -56,10 +55,9 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable protected static final int SIGNAL_ERROR = -2; protected static final byte SIGNAL_LAST = 32; + private final Configuration config; private final int envID; private final int setID; - private final boolean usePython3; - private final String planArguments; private transient Process process; private transient Thread shutdownThread; @@ -79,12 +77,11 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable protected transient Thread outPrinter; protected transient Thread errorPrinter; - public PythonStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray, S sender) { + public PythonStreamer(AbstractRichFunction function, Configuration config, int envID, int setID, boolean usesByteArray, S sender) { + this.config = config; this.envID = envID; this.setID = setID; - this.usePython3 = PythonPlanBinder.usePython3; - planArguments = PythonPlanBinder.arguments.toString(); - receiver = new PythonReceiver<>(usesByteArray); + this.receiver = new PythonReceiver<>(config, usesByteArray); this.function = function; this.sender = sender; } @@ -101,16 +98,21 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable } private void startPython() throws IOException { - String outputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; - String inputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; + String tmpDir = config.getString(PythonOptions.DATA_TMP_DIR); + if (tmpDir == null) { + tmpDir = System.getProperty("java.io.tmpdir"); + } + File outputFile = new File(tmpDir, envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "_output"); + File inputFile = new File(tmpDir, envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "_input)"); - sender.open(inputFilePath); - receiver.open(outputFilePath); + sender.open(inputFile); + receiver.open(outputFile); String path = function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath(); + String planPath = path + FLINK_PYTHON_PLAN_NAME; - String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; + String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH); try { Runtime.getRuntime().exec(pythonBinaryPath); @@ -118,7 +120,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); } - process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments); + process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + config.getString(PLAN_ARGUMENTS_KEY, "")); outPrinter = new Thread(new StreamPrinter(process.getInputStream())); outPrinter.start(); errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg)); @@ -143,8 +145,9 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable processOutput.write(("" + server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n") .getBytes(ConfigConstants.DEFAULT_CHARSET)); - processOutput.write((inputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); - processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); + processOutput.write(((config.getLong(PythonOptions.MMAP_FILE_SIZE) << 10) + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); + processOutput.write((inputFile + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); + processOutput.write((outputFile + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); processOutput.flush(); while (true) { http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java index 9b62563..9e93dda 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java @@ -13,6 +13,8 @@ package org.apache.flink.python.api.streaming.plan; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.python.api.PythonOptions; import org.apache.flink.python.api.streaming.util.StreamPrinter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,10 +24,7 @@ import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH; -import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH; import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME; -import static org.apache.flink.python.api.PythonPlanBinder.usePython3; /** * Generic class to exchange data during the plan phase. @@ -33,6 +32,7 @@ import static org.apache.flink.python.api.PythonPlanBinder.usePython3; public class PythonPlanStreamer { protected static final Logger LOG = LoggerFactory.getLogger(PythonPlanStreamer.class); + private final Configuration config; protected PythonPlanSender sender; protected PythonPlanReceiver receiver; @@ -40,6 +40,10 @@ public class PythonPlanStreamer { private Process process; private ServerSocket server; private Socket socket; + + public PythonPlanStreamer(Configuration config) { + this.config = config; + } public Object getRecord() throws IOException { return getRecord(false); @@ -58,7 +62,7 @@ public class PythonPlanStreamer { } private void startPython(String tmpPath, String args) throws IOException { - String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH; + String pythonBinaryPath = config.getString(PythonOptions.PYTHON_BINARY_PATH); try { Runtime.getRuntime().exec(pythonBinaryPath); http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py index 293f5e9..09e6b36 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/connection/Connection.py @@ -23,8 +23,6 @@ import sys PY2 = sys.version_info[0] == 2 PY3 = sys.version_info[0] == 3 -MAPPED_FILE_SIZE = 1024 * 1024 * 64 - SIGNAL_REQUEST_BUFFER = b"\x00\x00\x00\x00" SIGNAL_REQUEST_BUFFER_G0 = b"\xFF\xFF\xFF\xFD" SIGNAL_REQUEST_BUFFER_G1 = b"\xFF\xFF\xFF\xFC" @@ -67,15 +65,16 @@ class PureTCPConnection(object): class BufferingTCPMappedFileConnection(object): - def __init__(self, input_file, output_file, port): + def __init__(self, input_file, output_file, mmap_size, port): self._input_file = open(input_file, "rb+") self._output_file = open(output_file, "rb+") + self._mmap_size = mmap_size if hasattr(mmap, 'MAP_SHARED'): - self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ) - self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE) + self._file_input_buffer = mmap.mmap(self._input_file.fileno(), mmap_size, mmap.MAP_SHARED, mmap.ACCESS_READ) + self._file_output_buffer = mmap.mmap(self._output_file.fileno(), mmap_size, mmap.MAP_SHARED, mmap.ACCESS_WRITE) else: - self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, None, mmap.ACCESS_READ) - self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, None, mmap.ACCESS_WRITE) + self._file_input_buffer = mmap.mmap(self._input_file.fileno(), mmap_size, None, mmap.ACCESS_READ) + self._file_output_buffer = mmap.mmap(self._output_file.fileno(), mmap_size, None, mmap.ACCESS_WRITE) self._socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM) self._socket.connect((SOCKET.gethostbyname("localhost"), port)) @@ -92,10 +91,10 @@ class BufferingTCPMappedFileConnection(object): def write(self, msg): length = len(msg) - if length > MAPPED_FILE_SIZE: + if length > self._mmap_size: raise Exception("Serialized object does not fit into a single buffer.") tmp = self._out_size + length - if tmp > MAPPED_FILE_SIZE: + if tmp > self._mmap_size: self._write_buffer() self.write(msg) else: @@ -150,8 +149,8 @@ class BufferingTCPMappedFileConnection(object): class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection): - def __init__(self, input_file, output_file, port): - super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, port) + def __init__(self, input_file, output_file, mmap_size, port): + super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, mmap_size, port) self._input = [b"", b""] self._input_offset = [0, 0] self._input_size = [0, 0] http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py index 83f563b..edd2c61 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/CoGroupFunction.py @@ -25,8 +25,8 @@ class CoGroupFunction(Function.Function): self._keys1 = None self._keys2 = None - def _configure(self, input_file, output_file, port, env, info, subtask_index): - self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, port) + def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index): + self._connection = Connection.TwinBufferingTCPMappedFileConnection(input_file, output_file, mmap_size, port) self._iterator = Iterator.Iterator(self._connection, env, 0) self._iterator2 = Iterator.Iterator(self._connection, env, 1) self._cgiter = Iterator.CoGroupIterator(self._iterator, self._iterator2, self._keys1, self._keys2) http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py index 45a0f2e..a70a359 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/Function.py @@ -32,8 +32,8 @@ class Function(object): self.context = None self._env = None - def _configure(self, input_file, output_file, port, env, info, subtask_index): - self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port) + def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index): + self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, mmap_size, port) self._iterator = Iterator.Iterator(self._connection, env) self._collector = Collector.Collector(self._connection, env, info) self.context = RuntimeContext.RuntimeContext(self._iterator, self._collector, subtask_index) http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py index 77b53a2..5a353a1 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/GroupReduceFunction.py @@ -25,8 +25,8 @@ class GroupReduceFunction(Function.Function): def __init__(self): super(GroupReduceFunction, self).__init__() - def _configure(self, input_file, output_file, port, env, info, subtask_index): - super(GroupReduceFunction, self)._configure(input_file, output_file, port, env, info, subtask_index) + def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index): + super(GroupReduceFunction, self)._configure(input_file, output_file, mmap_size, port, env, info, subtask_index) if len(info.key1) == 0: self._run = self._run_all_group_reduce else: http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py index 08af276..5acabe0 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/functions/ReduceFunction.py @@ -24,8 +24,8 @@ class ReduceFunction(Function.Function): def __init__(self): super(ReduceFunction, self).__init__() - def _configure(self, input_file, output_file, port, env, info, subtask_index): - super(ReduceFunction, self)._configure(input_file, output_file, port, env, info, subtask_index) + def _configure(self, input_file, output_file, mmap_size, port, env, info, subtask_index): + super(ReduceFunction, self)._configure(input_file, output_file, mmap_size, port, env, info, subtask_index) if len(info.key1) == 0: self._run = self._run_all_reduce else: http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index 6e496de..797ae96 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -234,6 +234,7 @@ class Environment(object): port = int(sys.stdin.readline().rstrip('\n')) subtask_index = int(sys.stdin.readline().rstrip('\n')) + mmap_size = int(sys.stdin.readline().rstrip('\n')) input_path = sys.stdin.readline().rstrip('\n') output_path = sys.stdin.readline().rstrip('\n') @@ -244,7 +245,7 @@ class Environment(object): if set.id == id: used_set = set operator = set.operator - operator._configure(input_path, output_path, port, self, used_set, subtask_index) + operator._configure(input_path, output_path, mmap_size, port, self, used_set, subtask_index) operator._go() operator._close() sys.stdout.flush() @@ -252,7 +253,7 @@ class Environment(object): except: sys.stdout.flush() sys.stderr.flush() - if operator is not None: + if operator is not None and operator._connection is not None: operator._connection._socket.send(struct.pack(">i", -2)) elif port is not None: socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM) http://git-wip-us.apache.org/repos/asf/flink/blob/bdcebfda/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java index d144298..ba8ea78 100644 --- a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java +++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/PythonPlanBinderTest.java @@ -12,6 +12,7 @@ */ package org.apache.flink.python.api; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -21,9 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_2; -import static org.apache.flink.python.api.PythonPlanBinder.ARGUMENT_PYTHON_3; - public class PythonPlanBinderTest extends JavaProgramTestBase { @Override @@ -75,12 +73,16 @@ public class PythonPlanBinderTest extends JavaProgramTestBase { String utils = findUtilsFile(); if (isPython2Supported()) { for (String file : findTestFiles()) { - PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_2, file, utils}); + Configuration configuration = new Configuration(); + config.setString(PythonOptions.PYTHON_BINARY_PATH, "python"); + new PythonPlanBinder(configuration).runPlan(new String[]{file, utils}); } } if (isPython3Supported()) { for (String file : findTestFiles()) { - PythonPlanBinder.main(new String[]{ARGUMENT_PYTHON_3, file, utils}); + Configuration configuration = new Configuration(); + config.setString(PythonOptions.PYTHON_BINARY_PATH, "python3"); + new PythonPlanBinder(configuration).runPlan(new String[]{file, utils}); } } }