This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5f99c42d27ad6b42df70c16810029ade365c9495
Author: huangxingbo <hxbks...@gmail.com>
AuthorDate: Mon Aug 16 10:35:58 2021 +0800

    [FLINK-22871][python] Support to specify python client interpreter used to 
compile jobs
    
    This closes #16833.
---
 docs/content.zh/docs/deployment/cli.md             |  9 ++++
 docs/content/docs/deployment/cli.md                |  9 ++++
 .../shortcodes/generated/python_configuration.html |  2 +-
 .../apache/flink/client/cli/CliFrontendParser.java | 13 ++++++
 .../flink/client/cli/ProgramOptionsUtils.java      |  4 +-
 .../apache/flink/client/python/PythonEnvUtils.java | 53 ++++++++++++++++++++++
 .../org/apache/flink/python/PythonOptions.java     |  5 +-
 .../flink/python/util/PythonDependencyUtils.java   |  7 +++
 .../flink/client/python/PythonEnvUtilsTest.java    | 27 +++++++++++
 .../flink/table/client/cli/CliOptionsParser.java   |  4 ++
 10 files changed, 128 insertions(+), 5 deletions(-)

diff --git a/docs/content.zh/docs/deployment/cli.md 
b/docs/content.zh/docs/deployment/cli.md
index aa2a017..f9c05dc 100644
--- a/docs/content.zh/docs/deployment/cli.md
+++ b/docs/content.zh/docs/deployment/cli.md
@@ -480,6 +480,15 @@ related options. Here's an overview of all the Python 
related options for the ac
             </td>
         </tr>
         <tr>
+            <td><code 
class="highlighter-rouge">-pyclientexec,--pyClientExecutable</code></td>
+            <td>
+                The path of the Python interpreter used to launch the Python 
process when submitting
+                the Python jobs via \"flink run\" or compiling the Java/Scala 
jobs containing
+                Python UDFs.
+                (e.g., --pyArchives file:///tmp/py37.zip --pyClientExecutable 
py37.zip/py37/python)
+            </td>
+        </tr>
+        <tr>
             <td><code 
class="highlighter-rouge">-pyexec,--pyExecutable</code></td>
             <td>
                 Specify the path of the python interpreter used to execute the 
python UDF worker
diff --git a/docs/content/docs/deployment/cli.md 
b/docs/content/docs/deployment/cli.md
index d1edb69..6f1f041 100644
--- a/docs/content/docs/deployment/cli.md
+++ b/docs/content/docs/deployment/cli.md
@@ -478,6 +478,15 @@ related options. Here's an overview of all the Python 
related options for the ac
             </td>
         </tr>
         <tr>
+            <td><code 
class="highlighter-rouge">-pyclientexec,--pyClientExecutable</code></td>
+            <td>
+                The path of the Python interpreter used to launch the Python 
process when submitting
+                the Python jobs via \"flink run\" or compiling the Java/Scala 
jobs containing
+                Python UDFs.
+                (e.g., --pyArchives file:///tmp/py37.zip --pyClientExecutable 
py37.zip/py37/python)
+            </td>
+        </tr>
+        <tr>
             <td><code 
class="highlighter-rouge">-pyexec,--pyExecutable</code></td>
             <td>
                 Specify the path of the python interpreter used to execute the 
python UDF worker
diff --git a/docs/layouts/shortcodes/generated/python_configuration.html 
b/docs/layouts/shortcodes/generated/python_configuration.html
index b600141..bb250a3 100644
--- a/docs/layouts/shortcodes/generated/python_configuration.html
+++ b/docs/layouts/shortcodes/generated/python_configuration.html
@@ -18,7 +18,7 @@
             <td><h5>python.client.executable</h5></td>
             <td style="word-wrap: break-word;">"python"</td>
             <td>String</td>
-            <td>The path of the Python interpreter used to launch the Python 
process when submitting the Python jobs via "flink run" or compiling the 
Java/Scala jobs containing Python UDFs. Equivalent to the environment variable 
PYFLINK_CLIENT_EXECUTABLE. The priority is as following: <br />1. the 
configuration 'python.client.executable' defined in the source code;<br />2. 
the environment variable PYFLINK_CLIENT_EXECUTABLE;<br />3. the configuration 
'python.client.executable' defined in  [...]
+            <td>The path of the Python interpreter used to launch the Python 
process when submitting the Python jobs via "flink run" or compiling the 
Java/Scala jobs containing Python UDFs. Equivalent to the command line option 
"-pyclientexec" or the environment variable PYFLINK_CLIENT_EXECUTABLE. The 
priority is as following: <br />1. the command line option "-pyclientexec";<br 
/>2. the environment variable PYFLINK_CLIENT_EXECUTABLE;<br />3. the 
configuration 'python.client.executable'  [...]
         </tr>
         <tr>
             <td><h5>python.executable</h5></td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index de41b04..3a779b5 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -244,6 +244,15 @@ public class CliFrontendParser {
                             + "Pip (version >= 7.1.0) and SetupTools (version 
>= 37.0.0). "
                             + "Please ensure that the specified environment 
meets the above requirements.");
 
+    public static final Option PYCLIENTEXEC_OPTION =
+            new Option(
+                    "pyclientexec",
+                    "pyClientExecutable",
+                    true,
+                    "The path of the Python interpreter used to launch the 
Python "
+                            + "process when submitting the Python jobs via 
\"flink run\" or compiling "
+                            + "the Java/Scala jobs containing Python UDFs.");
+
     static {
         HELP_OPTION.setRequired(false);
 
@@ -305,6 +314,8 @@ public class CliFrontendParser {
         PYARCHIVE_OPTION.setRequired(false);
 
         PYEXEC_OPTION.setRequired(false);
+
+        PYCLIENTEXEC_OPTION.setRequired(false);
     }
 
     static final Options RUN_OPTIONS = getRunCommandOptions();
@@ -331,6 +342,7 @@ public class CliFrontendParser {
         options.addOption(PYREQUIREMENTS_OPTION);
         options.addOption(PYARCHIVE_OPTION);
         options.addOption(PYEXEC_OPTION);
+        options.addOption(PYCLIENTEXEC_OPTION);
         return options;
     }
 
@@ -346,6 +358,7 @@ public class CliFrontendParser {
         options.addOption(PYREQUIREMENTS_OPTION);
         options.addOption(PYARCHIVE_OPTION);
         options.addOption(PYEXEC_OPTION);
+        options.addOption(PYCLIENTEXEC_OPTION);
         return options;
     }
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
index 523dd42..4296ad2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptionsUtils.java
@@ -36,6 +36,7 @@ import java.net.URLClassLoader;
 
 import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYMODULE_OPTION;
@@ -67,7 +68,8 @@ public enum ProgramOptionsUtils {
         return line.hasOption(PYFILES_OPTION.getOpt())
                 || line.hasOption(PYREQUIREMENTS_OPTION.getOpt())
                 || line.hasOption(PYARCHIVE_OPTION.getOpt())
-                || line.hasOption(PYEXEC_OPTION.getOpt());
+                || line.hasOption(PYEXEC_OPTION.getOpt())
+                || line.hasOption(PYCLIENTEXEC_OPTION.getOpt());
     }
 
     public static ProgramOptions createPythonProgramOptions(CommandLine line)
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index f407c6c..254c8b5 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -22,11 +22,14 @@ import 
org.apache.flink.client.deployment.application.UnsuccessfulExecutionExcep
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.python.util.PythonDependencyUtils;
+import org.apache.flink.python.util.ZipUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
 
@@ -64,6 +67,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES;
 import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE;
 import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
 import static 
org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER;
@@ -72,6 +76,8 @@ import static 
org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER;
 final class PythonEnvUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(PythonEnvUtils.class);
 
+    private static final String PYTHON_ARCHIVES_DIR = "python-archives";
+
     static final String PYFLINK_CLIENT_EXECUTABLE = 
"PYFLINK_CLIENT_EXECUTABLE";
 
     static final long CHECK_INTERVAL = 100;
@@ -86,6 +92,8 @@ final class PythonEnvUtils {
     static class PythonEnvironment {
         String tempDirectory;
 
+        String archivesDirectory;
+
         String pythonExec = OperatingSystem.isWindows() ? "python.exe" : 
"python";
 
         String pythonPath;
@@ -137,6 +145,48 @@ final class PythonEnvUtils {
                             .collect(Collectors.toList());
             addToPythonPath(env, pythonFiles);
         }
+
+        // 5. set the archives directory as the working directory, then user 
could access the
+        // content of the archives via relative path
+        if (config.getOptional(PYTHON_ARCHIVES).isPresent()
+                && (config.getOptional(PYTHON_CLIENT_EXECUTABLE).isPresent()
+                        || !StringUtils.isNullOrWhitespaceOnly(
+                                System.getenv(PYFLINK_CLIENT_EXECUTABLE)))) {
+            env.archivesDirectory = String.join(File.separator, tmpDir, 
PYTHON_ARCHIVES_DIR);
+
+            // extract archives to archives directory
+            config.getOptional(PYTHON_ARCHIVES)
+                    .ifPresent(
+                            pyArchives -> {
+                                for (String archive : 
pyArchives.split(FILE_DELIMITER)) {
+                                    Path archivePath;
+                                    String targetDir;
+                                    if 
(archive.contains(PythonDependencyUtils.PARAM_DELIMITER)) {
+                                        String[] filePathAndTargetDir =
+                                                archive.split(
+                                                        
PythonDependencyUtils.PARAM_DELIMITER, 2);
+                                        archivePath = new 
Path(filePathAndTargetDir[0]);
+                                        targetDir = filePathAndTargetDir[1];
+                                    } else {
+                                        archivePath = new Path(archive);
+                                        targetDir = archivePath.getName();
+                                    }
+                                    try {
+                                        ZipUtils.extractZipFileWithPermissions(
+                                                archivePath.getPath(),
+                                                String.join(
+                                                        File.separator,
+                                                        env.archivesDirectory,
+                                                        targetDir));
+                                    } catch (IOException e) {
+                                        throw new RuntimeException(
+                                                "Extract archives to archives 
directory failed.",
+                                                e);
+                                    }
+                                }
+                            });
+        }
+
         if (entryPointScript != null) {
             addToPythonPath(env, Collections.singletonList(new 
Path(entryPointScript)));
         }
@@ -269,6 +319,9 @@ final class PythonEnvUtils {
                         String.join(File.pathSeparator, pythonEnv.pythonPath, 
defaultPythonPath));
             }
         }
+        if (pythonEnv.archivesDirectory != null) {
+            pythonProcessBuilder.directory(new 
File(pythonEnv.archivesDirectory));
+        }
         pythonEnv.systemEnv.forEach(env::put);
         commands.add(0, pythonEnv.pythonExec);
         pythonProcessBuilder.command(commands);
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java 
b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
index 8055efe..e2a8e21 100644
--- a/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
+++ b/flink-python/src/main/java/org/apache/flink/python/PythonOptions.java
@@ -145,11 +145,10 @@ public class PythonOptions {
                                     .text(
                                             "The path of the Python 
interpreter used to launch the Python process when submitting the "
                                                     + "Python jobs via \"flink 
run\" or compiling the Java/Scala jobs containing Python UDFs. "
-                                                    + "Equivalent to the 
environment variable PYFLINK_CLIENT_EXECUTABLE. "
+                                                    + "Equivalent to the 
command line option \"-pyclientexec\" or the environment variable 
PYFLINK_CLIENT_EXECUTABLE. "
                                                     + "The priority is as 
following: ")
                                     .linebreak()
-                                    .text(
-                                            "1. the configuration 
'python.client.executable' defined in the source code;")
+                                    .text("1. the command line option 
\"-pyclientexec\";")
                                     .linebreak()
                                     .text("2. the environment variable 
PYFLINK_CLIENT_EXECUTABLE;")
                                     .linebreak()
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
index 60c1553..c5f8f82 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonDependencyUtils.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
 import static 
org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
@@ -111,6 +112,12 @@ public class PythonDependencyUtils {
                     PythonOptions.PYTHON_EXECUTABLE,
                     commandLine.getOptionValue(PYEXEC_OPTION.getOpt()));
         }
+        if (commandLine.hasOption(PYCLIENTEXEC_OPTION.getOpt())) {
+            config.set(
+                    PythonOptions.PYTHON_CLIENT_EXECUTABLE,
+                    commandLine.getOptionValue(PYCLIENTEXEC_OPTION.getOpt()));
+        }
+
         return config;
     }
 
diff --git 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
index 6105d5c..f94f1c0 100644
--- 
a/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/client/python/PythonEnvUtilsTest.java
@@ -50,6 +50,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.flink.client.python.PythonEnvUtils.PYFLINK_CLIENT_EXECUTABLE;
 import static 
org.apache.flink.client.python.PythonEnvUtils.preparePythonEnvironment;
+import static org.apache.flink.python.PythonOptions.PYTHON_ARCHIVES;
 import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE;
 import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
 import static 
org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER;
@@ -179,6 +180,14 @@ public class PythonEnvUtilsTest {
     public void testSetPythonExecutable() throws IOException {
         Configuration config = new Configuration();
 
+        File zipFile = new File(tmpDirPath + File.separator + "venv.zip");
+        try (ZipArchiveOutputStream zipOut =
+                new ZipArchiveOutputStream(new FileOutputStream(zipFile))) {
+            ZipArchiveEntry entry = new ZipArchiveEntry("zipDir" + 
"/zipfile0");
+            zipOut.putArchiveEntry(entry);
+            zipOut.write(new byte[] {1, 1, 1, 1, 1});
+            zipOut.closeArchiveEntry();
+        }
         PythonEnvUtils.PythonEnvironment env = 
preparePythonEnvironment(config, null, tmpDirPath);
         if (OperatingSystem.isWindows()) {
             Assert.assertEquals("python.exe", env.pythonExec);
@@ -197,6 +206,24 @@ public class PythonEnvUtilsTest {
             CommonTestUtils.setEnv(systemEnv);
         }
 
+        config.setString(PYTHON_ARCHIVES, zipFile.getPath());
+        systemEnv = new HashMap<>(System.getenv());
+        systemEnv.put(PYFLINK_CLIENT_EXECUTABLE, "venv.zip/venv/bin/python");
+        CommonTestUtils.setEnv(systemEnv);
+        try {
+            env = preparePythonEnvironment(config, null, tmpDirPath);
+            Assert.assertEquals("venv.zip/venv/bin/python", env.pythonExec);
+        } finally {
+            systemEnv.remove(PYFLINK_CLIENT_EXECUTABLE);
+            CommonTestUtils.setEnv(systemEnv);
+        }
+        java.nio.file.Path[] files =
+                FileUtils.listDirectory(new 
File(env.archivesDirectory).toPath());
+        Assert.assertEquals(files.length, 1);
+        Assert.assertEquals(files[0].getFileName().toString(), 
zipFile.getName());
+
+        config.removeConfig(PYTHON_ARCHIVES);
+
         config.set(PYTHON_CLIENT_EXECUTABLE, "/usr/bin/python");
         env = preparePythonEnvironment(config, null, tmpDirPath);
         Assert.assertEquals("/usr/bin/python", env.pythonExec);
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index 00993be..e8da0a9 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.PYCLIENTEXEC_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
 import static 
org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
@@ -160,6 +161,7 @@ public class CliOptionsParser {
         options.addOption(PYREQUIREMENTS_OPTION);
         options.addOption(PYARCHIVE_OPTION);
         options.addOption(PYEXEC_OPTION);
+        options.addOption(PYCLIENTEXEC_OPTION);
         return options;
     }
 
@@ -172,6 +174,7 @@ public class CliOptionsParser {
         options.addOption(PYREQUIREMENTS_OPTION);
         options.addOption(PYARCHIVE_OPTION);
         options.addOption(PYEXEC_OPTION);
+        options.addOption(PYCLIENTEXEC_OPTION);
         return options;
     }
 
@@ -183,6 +186,7 @@ public class CliOptionsParser {
         options.addOption(PYREQUIREMENTS_OPTION);
         options.addOption(PYARCHIVE_OPTION);
         options.addOption(PYEXEC_OPTION);
+        options.addOption(PYCLIENTEXEC_OPTION);
         return options;
     }
 

Reply via email to