This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5f99c42d27ad6b42df70c16810029ade365c9495 Author: huangxingbo <hxbks...@gmail.com> AuthorDate: Mon Aug 16 10:35:58 2021 +0800 [FLINK-22871][python] Support to specify python client interpreter used to compile jobs This closes #16833. --- docs/content.zh/docs/deployment/cli.md | 9 ++++ docs/content/docs/deployment/cli.md | 9 ++++ .../shortcodes/generated/python_configuration.html | 2 +- .../apache/flink/client/cli/CliFrontendParser.java | 13 ++++++ .../flink/client/cli/ProgramOptionsUtils.java | 4 +- .../apache/flink/client/python/PythonEnvUtils.java | 53 ++++++++++++++++++++++ .../org/apache/flink/python/PythonOptions.java | 5 +- .../flink/python/util/PythonDependencyUtils.java | 7 +++ .../flink/client/python/PythonEnvUtilsTest.java | 27 +++++++++++ .../flink/table/client/cli/CliOptionsParser.java | 4 ++ 10 files changed, 128 insertions(+), 5 deletions(-) diff --git a/docs/content.zh/docs/deployment/cli.md b/docs/content.zh/docs/deployment/cli.md index aa2a017..f9c05dc 100644 --- a/docs/content.zh/docs/deployment/cli.md +++ b/docs/content.zh/docs/deployment/cli.md @@ -480,6 +480,15 @@ related options. Here's an overview of all the Python related options for the ac </td> </tr> <tr> + <td><code class="highlighter-rouge">-pyclientexec,--pyClientExecutable</code></td> + <td> + The path of the Python interpreter used to launch the Python process when submitting + the Python jobs via \"flink run\" or compiling the Java/Scala jobs containing + Python UDFs. + (e.g., --pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python) + </td> + </tr> + <tr> <td><code class="highlighter-rouge">-pyexec,--pyExecutable</code></td> <td> Specify the path of the python interpreter used to execute the python UDF worker diff --git a/docs/content/docs/deployment/cli.md b/docs/content/docs/deployment/cli.md index d1edb69..6f1f041 100644 --- a/docs/content/docs/deployment/cli.md +++ b/docs/content/docs/deployment/cli.md @@ -478,6 +478,15 @@ related options. Here's an overview of all the Python related options for the ac </td> </tr> <tr> + <td><code class="highlighter-rouge">-pyclientexec,--pyClientExecutable</code></td> + <td> + The path of the Python interpreter used to launch the Python process when submitting + the Python jobs via \"flink run\" or compiling the Java/Scala jobs containing + Python UDFs. + (e.g., --pyArchives file:///tmp/py37.zip --pyClientExecutable py37.zip/py37/python) + </td> + </tr> + <tr> <td><code class="highlighter-rouge">-pyexec,--pyExecutable</code></td> <td> Specify the path of the python interpreter used to execute the python UDF worker diff --git a/docs/layouts/shortcodes/generated/python_configuration.html b/docs/layouts/shortcodes/generated/python_configuration.html index b600141..bb250a3 100644 --- a/docs/layouts/shortcodes/generated/python_configuration.html +++ b/docs/layouts/shortcodes/generated/python_configuration.html @@ -18,7 +18,7 @@ <td><h5>python.client.executable</h5></td> <td style="word-wrap: break-word;">"python"</td> <td>String</td> - <td>The path of the Python interpreter used to launch the Python process when submitting the Python jobs via "flink run" or compiling the Java/Scala jobs containing Python UDFs. Equivalent to the environment variable PYFLINK_CLIENT_EXECUTABLE. The priority is as following: <br />1. the configuration 'python.client.executable' defined in the source code;<br />2. the environment variable PYFLINK_CLIENT_EXECUTABLE;<br />3. the configuration 'python.client.executable' defined in [...] + <td>The path of the Python interpreter used to launch the Python process when submitting the Python jobs via "flink run" or compiling the Java/Scala jobs containing Python UDFs. Equivalent to the command line option "-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The priority is as following: <br />1. the command line option "-pyclientexec";<br />2. the environment variable PYFLINK_CLIENT_EXECUTABLE;<br />3. the configuration 'python.client.executable' [...] </tr> <tr> <td><h5>python.executable</h5></td> diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index de41b04..3a779b5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -244,6 +244,15 @@ public class CliFrontendParser { + "Pip (version >= 7.1.0) and SetupTools (version >= 37.0.0). " + "Please ensure that the specified environment meets the above requirements."); + public static final Option PYCLIENTEXEC_OPTION = + new Option( + "pyclientexec", + "pyClientExecutable", + true, + "The path of the Python interpreter used to launch the Python " + + "process when submitting the Python jobs via \"flink run\" or compiling " + + "the Java/Scala jobs containing Python UDFs."); + static { HELP_OPTION.setRequired(false); @@ -305,6 +314,8 @@ public class CliFrontendParser { PYARCHIVE_OPTION.setRequired(false); PYEXEC_OPTION.setRequired(false); + + PYCLIENTEXEC_OPTION.setRequired(false); } static final Options RUN_OPTIONS = getRunCommandOptions(); @@ -331,6 +342,7 @@ public class CliFrontendParser { options.addOption(PYREQUIREMENTS_OPTION); options.addOption(PYARCHIVE_OPTION); options.addOption(PYEXEC_OPTION); + options.addOption(PYCLIENTEXEC_OPTION); return options; } @@ -346,6 +358,7 @@ public class CliFrontendParser { options.addOption(PYREQUIREMENTS_OPTION); options.addOption(PYARCHIVE_OPTION); options.addOption(PYEXEC_OPTION); + options.addOption(PYCLIENTEXEC_OPTION); return options; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java index 523dd42..4296ad2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java @@ -36,6 +36,7 @@ import java.net.URLClassLoader; import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION; @@ -67,7 +68,8 @@ public enum ProgramOptionsUtils { return line.hasOption(PYFILES_OPTION.getOpt()) || line.hasOption(PYREQUIREMENTS_OPTION.getOpt()) || line.hasOption(PYARCHIVE_OPTION.getOpt()) - || line.hasOption(PYEXEC_OPTION.getOpt()); + || line.hasOption(PYEXEC_OPTION.getOpt()) + || line.hasOption(PYCLIENTEXEC_OPTION.getOpt()); } public static ProgramOptions createPythonProgramOptions(CommandLine line) diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java index f407c6c..254c8b5 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java @@ -22,11 +22,14 @@ import org.apache.flink.client.deployment.application.UnsuccessfulExecutionExcep import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.Path; +import org.apache.flink.python.util.PythonDependencyUtils; +import org.apache.flink.python.util.ZipUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import org.apache.flink.shaded.guava30.com.google.common.base.Strings; @@ -64,6 +67,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES; import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE; import static org.apache.flink.python.PythonOptions.PYTHON_FILES; import static org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER; @@ -72,6 +76,8 @@ import static org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER; final class PythonEnvUtils { private static final Logger LOG = LoggerFactory.getLogger(PythonEnvUtils.class); + private static final String PYTHON_ARCHIVES_DIR = "python-archives"; + static final String PYFLINK_CLIENT_EXECUTABLE = "PYFLINK_CLIENT_EXECUTABLE"; static final long CHECK_INTERVAL = 100; @@ -86,6 +92,8 @@ final class PythonEnvUtils { static class PythonEnvironment { String tempDirectory; + String archivesDirectory; + String pythonExec = OperatingSystem.isWindows() ? "python.exe" : "python"; String pythonPath; @@ -137,6 +145,48 @@ final class PythonEnvUtils { .collect(Collectors.toList()); addToPythonPath(env, pythonFiles); } + + // 5. set the archives directory as the working directory, then user could access the + // content of the archives via relative path + if (config.getOptional(PYTHON_ARCHIVES).isPresent() + && (config.getOptional(PYTHON_CLIENT_EXECUTABLE).isPresent() + || !StringUtils.isNullOrWhitespaceOnly( + System.getenv(PYFLINK_CLIENT_EXECUTABLE)))) { + env.archivesDirectory = String.join(File.separator, tmpDir, PYTHON_ARCHIVES_DIR); + + // extract archives to archives directory + config.getOptional(PYTHON_ARCHIVES) + .ifPresent( + pyArchives -> { + for (String archive : pyArchives.split(FILE_DELIMITER)) { + Path archivePath; + String targetDir; + if (archive.contains(PythonDependencyUtils.PARAM_DELIMITER)) { + String[] filePathAndTargetDir = + archive.split( + PythonDependencyUtils.PARAM_DELIMITER, 2); + archivePath = new Path(filePathAndTargetDir[0]); + targetDir = filePathAndTargetDir[1]; + } else { + archivePath = new Path(archive); + targetDir = archivePath.getName(); + } + try { + ZipUtils.extractZipFileWithPermissions( + archivePath.getPath(), + String.join( + File.separator, + env.archivesDirectory, + targetDir)); + } catch (IOException e) { + throw new RuntimeException( + "Extract archives to archives directory failed.", + e); + } + } + }); + } + if (entryPointScript != null) { addToPythonPath(env, Collections.singletonList(new Path(entryPointScript))); } @@ -269,6 +319,9 @@ final class PythonEnvUtils { String.join(File.pathSeparator, pythonEnv.pythonPath, defaultPythonPath)); } } + if (pythonEnv.archivesDirectory != null) { + pythonProcessBuilder.directory(new File(pythonEnv.archivesDirectory)); + } pythonEnv.systemEnv.forEach(env::put); commands.add(0, pythonEnv.pythonExec); pythonProcessBuilder.command(commands); diff --git a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java index 8055efe..e2a8e21 100644 --- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java +++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java @@ -145,11 +145,10 @@ public class PythonOptions { .text( "The path of the Python interpreter used to launch the Python process when submitting the " + "Python jobs via \"flink run\" or compiling the Java/Scala jobs containing Python UDFs. " - + "Equivalent to the environment variable PYFLINK_CLIENT_EXECUTABLE. " + + "Equivalent to the command line option \"-pyclientexec\" or the environment variable PYFLINK_CLIENT_EXECUTABLE. " + "The priority is as following: ") .linebreak() - .text( - "1. the configuration 'python.client.executable' defined in the source code;") + .text("1. the command line option \"-pyclientexec\";") .linebreak() .text("2. the environment variable PYFLINK_CLIENT_EXECUTABLE;") .linebreak() diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java index 60c1553..c5f8f82 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.stream.Collectors; import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION; @@ -111,6 +112,12 @@ public class PythonDependencyUtils { PythonOptions.PYTHON_EXECUTABLE, commandLine.getOptionValue(PYEXEC_OPTION.getOpt())); } + if (commandLine.hasOption(PYCLIENTEXEC_OPTION.getOpt())) { + config.set( + PythonOptions.PYTHON_CLIENT_EXECUTABLE, + commandLine.getOptionValue(PYCLIENTEXEC_OPTION.getOpt())); + } + return config; } diff --git a/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java b/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java index 6105d5c..f94f1c0 100644 --- a/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java +++ b/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java @@ -50,6 +50,7 @@ import java.util.stream.Collectors; import static org.apache.flink.client.python.PythonEnvUtils.PYFLINK_CLIENT_EXECUTABLE; import static org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment; +import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES; import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE; import static org.apache.flink.python.PythonOptions.PYTHON_FILES; import static org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER; @@ -179,6 +180,14 @@ public class PythonEnvUtilsTest { public void testSetPythonExecutable() throws IOException { Configuration config = new Configuration(); + File zipFile = new File(tmpDirPath + File.separator + "venv.zip"); + try (ZipArchiveOutputStream zipOut = + new ZipArchiveOutputStream(new FileOutputStream(zipFile))) { + ZipArchiveEntry entry = new ZipArchiveEntry("zipDir" + "/zipfile0"); + zipOut.putArchiveEntry(entry); + zipOut.write(new byte[] {1, 1, 1, 1, 1}); + zipOut.closeArchiveEntry(); + } PythonEnvUtils.PythonEnvironment env = preparePythonEnvironment(config, null, tmpDirPath); if (OperatingSystem.isWindows()) { Assert.assertEquals("python.exe", env.pythonExec); @@ -197,6 +206,24 @@ public class PythonEnvUtilsTest { CommonTestUtils.setEnv(systemEnv); } + config.setString(PYTHON_ARCHIVES, zipFile.getPath()); + systemEnv = new HashMap<>(System.getenv()); + systemEnv.put(PYFLINK_CLIENT_EXECUTABLE, "venv.zip/venv/bin/python"); + CommonTestUtils.setEnv(systemEnv); + try { + env = preparePythonEnvironment(config, null, tmpDirPath); + Assert.assertEquals("venv.zip/venv/bin/python", env.pythonExec); + } finally { + systemEnv.remove(PYFLINK_CLIENT_EXECUTABLE); + CommonTestUtils.setEnv(systemEnv); + } + java.nio.file.Path[] files = + FileUtils.listDirectory(new File(env.archivesDirectory).toPath()); + Assert.assertEquals(files.length, 1); + Assert.assertEquals(files[0].getFileName().toString(), zipFile.getName()); + + config.removeConfig(PYTHON_ARCHIVES); + config.set(PYTHON_CLIENT_EXECUTABLE, "/usr/bin/python"); env = preparePythonEnvironment(config, null, tmpDirPath); Assert.assertEquals("/usr/bin/python", env.pythonExec); diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java index 00993be..e8da0a9 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.stream.Collectors; import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION; +import static org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION; @@ -160,6 +161,7 @@ public class CliOptionsParser { options.addOption(PYREQUIREMENTS_OPTION); options.addOption(PYARCHIVE_OPTION); options.addOption(PYEXEC_OPTION); + options.addOption(PYCLIENTEXEC_OPTION); return options; } @@ -172,6 +174,7 @@ public class CliOptionsParser { options.addOption(PYREQUIREMENTS_OPTION); options.addOption(PYARCHIVE_OPTION); options.addOption(PYEXEC_OPTION); + options.addOption(PYCLIENTEXEC_OPTION); return options; } @@ -183,6 +186,7 @@ public class CliOptionsParser { options.addOption(PYREQUIREMENTS_OPTION); options.addOption(PYARCHIVE_OPTION); options.addOption(PYEXEC_OPTION); + options.addOption(PYCLIENTEXEC_OPTION); return options; }