This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 722f844  [FLINK-17612][python][sql-client] Support Python command line 
options in SQL Client. (#12077)
722f844 is described below

commit 722f8445b7f8f2df90af9e8c4b3cfa231d71129d
Author: Wei Zhong <weizhong0...@gmail.com>
AuthorDate: Wed May 13 09:22:29 2020 +0800

    [FLINK-17612][python][sql-client] Support Python command line options in 
SQL Client. (#12077)
---
 docs/dev/table/sqlClient.md                        | 79 ++++++++++++++++++++++
 docs/dev/table/sqlClient.zh.md                     | 79 ++++++++++++++++++++++
 .../org/apache/flink/table/client/SqlClient.java   | 13 ++++
 .../apache/flink/table/client/cli/CliOptions.java  | 11 ++-
 .../flink/table/client/cli/CliOptionsParser.java   | 43 +++++++++++-
 5 files changed, 221 insertions(+), 4 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index daa1419..8331efe 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -136,6 +136,10 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            properties.
      -h,--help                             Show the help message with
                                            descriptions of all options.
+     -hist,--history <History file path>   The file which you want to save the
+                                           command history into. If not
+                                           specified, we will auto-generate one
+                                           under your user's home directory.
      -j,--jar <JAR file>                   A JAR file to be imported into the
                                            session. The file might contain
                                            user-defined classes needed for the
@@ -149,8 +153,83 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            statements such as functions, table
                                            sources, or sinks. Can be used
                                            multiple times.
+     -pyarch,--pyArchives <arg>            Add python archive files for job. 
The
+                                           archive files will be extracted to
+                                           the working directory of python UDF
+                                           worker. Currently only zip-format is
+                                           supported. For each archive file, a
+                                           target directory be specified. If 
the
+                                           target directory name is specified,
+                                           the archive file will be extracted 
to
+                                           a name can directory with the
+                                           specified name. Otherwise, the
+                                           archive file will be extracted to a
+                                           directory with the same name of the
+                                           archive file. The files uploaded via
+                                           this option are accessible via
+                                           relative path. '#' could be used as
+                                           the separator of the archive file
+                                           path and the target directory name.
+                                           Comma (',') could be used as the
+                                           separator to specify multiple 
archive
+                                           files. This option can be used to
+                                           upload the virtual environment, the
+                                           data files used in Python UDF (e.g.:
+                                           --pyArchives
+                                           
file:///tmp/py37.zip,file:///tmp/data
+                                           .zip#data --pyExecutable
+                                           py37.zip/py37/bin/python). The data
+                                           files could be accessed in Python
+                                           UDF, e.g.: f = open('data/data.txt',
+                                           'r').
+     -pyexec,--pyExecutable <arg>          Specify the path of the python
+                                           interpreter used to execute the
+                                           python UDF worker (e.g.:
+                                           --pyExecutable
+                                           /usr/local/bin/python3). The python
+                                           UDF worker depends on Python 3.5+,
+                                           Apache Beam (version == 2.19.0), Pip
+                                           (version >= 7.1.0) and SetupTools
+                                           (version >= 37.0.0). Please ensure
+                                           that the specified environment meets
+                                           the above requirements.
+     -pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.
+                                           These files will be added to the
+                                           PYTHONPATH of both the local client
+                                           and the remote python UDF worker. 
The
+                                           standard python resource file
+                                           suffixes such as .py/.egg/.zip or
+                                           directory are all supported. Comma
+                                           (',') could be used as the separator
+                                           to specify multiple files (e.g.:
+                                           --pyFiles
+                                           
file:///tmp/myresource.zip,hdfs:///$n
+                                           amenode_address/myresource2.zip).
+     -pyreq,--pyRequirements <arg>         Specify a requirements.txt file 
which
+                                           defines the third-party 
dependencies.
+                                           These dependencies will be installed
+                                           and added to the PYTHONPATH of the
+                                           python UDF worker. A directory which
+                                           contains the installation packages 
of
+                                           these dependencies could be 
specified
+                                           optionally. Use '#' as the separator
+                                           if the optional parameter exists
+                                           (e.g.: --pyRequirements
+                                           
file:///tmp/requirements.txt#file:///
+                                           tmp/cached_dir).
      -s,--session <session identifier>     The identifier for a session.
                                            'default' is the default identifier.
+     -u,--update <SQL update statement>    Experimental (for testing only!):
+                                           Instructs the SQL Client to
+                                           immediately execute the given update
+                                           statement after starting up. The
+                                           process is shut down after the
+                                           statement has been submitted to the
+                                           cluster and returns an appropriate
+                                           return code. Currently, this feature
+                                           is only supported for INSERT INTO
+                                           statements that declare the target
+                                           sink table.
 {% endhighlight %}
 
 {% top %}
diff --git a/docs/dev/table/sqlClient.zh.md b/docs/dev/table/sqlClient.zh.md
index 44355c8..c234390 100644
--- a/docs/dev/table/sqlClient.zh.md
+++ b/docs/dev/table/sqlClient.zh.md
@@ -137,6 +137,10 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            properties.
      -h,--help                             Show the help message with
                                            descriptions of all options.
+     -hist,--history <History file path>   The file which you want to save the
+                                           command history into. If not
+                                           specified, we will auto-generate one
+                                           under your user's home directory.
      -j,--jar <JAR file>                   A JAR file to be imported into the
                                            session. The file might contain
                                            user-defined classes needed for the
@@ -150,8 +154,83 @@ Mode "embedded" submits Flink jobs from the local machine.
                                            statements such as functions, table
                                            sources, or sinks. Can be used
                                            multiple times.
+     -pyarch,--pyArchives <arg>            Add python archive files for job. 
The
+                                           archive files will be extracted to
+                                           the working directory of python UDF
+                                           worker. Currently only zip-format is
+                                           supported. For each archive file, a
+                                           target directory be specified. If 
the
+                                           target directory name is specified,
+                                           the archive file will be extracted 
to
+                                           a name can directory with the
+                                           specified name. Otherwise, the
+                                           archive file will be extracted to a
+                                           directory with the same name of the
+                                           archive file. The files uploaded via
+                                           this option are accessible via
+                                           relative path. '#' could be used as
+                                           the separator of the archive file
+                                           path and the target directory name.
+                                           Comma (',') could be used as the
+                                           separator to specify multiple 
archive
+                                           files. This option can be used to
+                                           upload the virtual environment, the
+                                           data files used in Python UDF (e.g.:
+                                           --pyArchives
+                                           
file:///tmp/py37.zip,file:///tmp/data
+                                           .zip#data --pyExecutable
+                                           py37.zip/py37/bin/python). The data
+                                           files could be accessed in Python
+                                           UDF, e.g.: f = open('data/data.txt',
+                                           'r').
+     -pyexec,--pyExecutable <arg>          Specify the path of the python
+                                           interpreter used to execute the
+                                           python UDF worker (e.g.:
+                                           --pyExecutable
+                                           /usr/local/bin/python3). The python
+                                           UDF worker depends on Python 3.5+,
+                                           Apache Beam (version == 2.19.0), Pip
+                                           (version >= 7.1.0) and SetupTools
+                                           (version >= 37.0.0). Please ensure
+                                           that the specified environment meets
+                                           the above requirements.
+     -pyfs,--pyFiles <pythonFiles>         Attach custom python files for job.
+                                           These files will be added to the
+                                           PYTHONPATH of both the local client
+                                           and the remote python UDF worker. 
The
+                                           standard python resource file
+                                           suffixes such as .py/.egg/.zip or
+                                           directory are all supported. Comma
+                                           (',') could be used as the separator
+                                           to specify multiple files (e.g.:
+                                           --pyFiles
+                                           
file:///tmp/myresource.zip,hdfs:///$n
+                                           amenode_address/myresource2.zip).
+     -pyreq,--pyRequirements <arg>         Specify a requirements.txt file 
which
+                                           defines the third-party 
dependencies.
+                                           These dependencies will be installed
+                                           and added to the PYTHONPATH of the
+                                           python UDF worker. A directory which
+                                           contains the installation packages 
of
+                                           these dependencies could be 
specified
+                                           optionally. Use '#' as the separator
+                                           if the optional parameter exists
+                                           (e.g.: --pyRequirements
+                                           
file:///tmp/requirements.txt#file:///
+                                           tmp/cached_dir).
      -s,--session <session identifier>     The identifier for a session.
                                            'default' is the default identifier.
+     -u,--update <SQL update statement>    Experimental (for testing only!):
+                                           Instructs the SQL Client to
+                                           immediately execute the given update
+                                           statement after starting up. The
+                                           process is shut down after the
+                                           statement has been submitted to the
+                                           cluster and returns an appropriate
+                                           return code. Currently, this feature
+                                           is only supported for INSERT INTO
+                                           statements that declare the target
+                                           sink table.
 {% endhighlight %}
 
 {% top %}
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
index 3f68dc6..627e70b 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.client.cli.CliClient;
 import org.apache.flink.table.client.cli.CliOptions;
 import org.apache.flink.table.client.cli.CliOptionsParser;
@@ -36,7 +37,12 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.entries.ConfigurationEntry.create;
+import static 
org.apache.flink.table.client.config.entries.ConfigurationEntry.merge;
 
 /**
  * SQL Client for submitting SQL statements. The client can be executed in two
@@ -90,6 +96,7 @@ public class SqlClient {
 
                        // create CLI client with session environment
                        final Environment sessionEnv = 
readSessionEnvironment(options.getEnvironment());
+                       appendPythonConfig(sessionEnv, 
options.getPythonConfiguration());
                        final SessionContext context;
                        if (options.getSessionId() == null) {
                                context = new 
SessionContext(DEFAULT_SESSION_ID, sessionEnv);
@@ -166,6 +173,12 @@ public class SqlClient {
                }
        }
 
+       private static void appendPythonConfig(Environment env, Configuration 
pythonConfiguration) {
+               Map<String, Object> pythonConfig = new 
HashMap<>(pythonConfiguration.toMap());
+               Map<String, Object> combinedConfig = new 
HashMap<>(merge(env.getConfiguration(), create(pythonConfig)).asMap());
+               env.setConfiguration(combinedConfig);
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        public static void main(String[] args) {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
index 4eaed62..c211f79 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptions.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.client.cli;
 
+import org.apache.flink.configuration.Configuration;
+
 import java.net.URL;
 import java.util.List;
 
@@ -35,6 +37,7 @@ public class CliOptions {
        private final List<URL> libraryDirs;
        private final String updateStatement;
        private final String historyFilePath;
+       private final Configuration pythonConfiguration;
 
        public CliOptions(
                        boolean isPrintHelp,
@@ -44,7 +47,8 @@ public class CliOptions {
                        List<URL> jars,
                        List<URL> libraryDirs,
                        String updateStatement,
-                       String historyFilePath) {
+                       String historyFilePath,
+                       Configuration pythonConfiguration) {
                this.isPrintHelp = isPrintHelp;
                this.sessionId = sessionId;
                this.environment = environment;
@@ -53,6 +57,7 @@ public class CliOptions {
                this.libraryDirs = libraryDirs;
                this.updateStatement = updateStatement;
                this.historyFilePath = historyFilePath;
+               this.pythonConfiguration = pythonConfiguration;
        }
 
        public boolean isPrintHelp() {
@@ -86,4 +91,8 @@ public class CliOptions {
        public String getHistoryFilePath() {
                return historyFilePath;
        }
+
+       public Configuration getPythonConfiguration() {
+               return pythonConfiguration;
+       }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
index a7626bb..2c6b7ef 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliOptionsParser.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.client.cli;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.client.SqlClientException;
 
@@ -29,11 +30,18 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 
 import java.io.File;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.client.cli.CliFrontendParser.PYARCHIVE_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYEXEC_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PYFILES_OPTION;
+import static 
org.apache.flink.client.cli.CliFrontendParser.PYREQUIREMENTS_OPTION;
+
 /**
  * Parser for command line options.
  */
@@ -145,6 +153,10 @@ public class CliOptionsParser {
                options.addOption(OPTION_LIBRARY);
                options.addOption(OPTION_UPDATE);
                options.addOption(OPTION_HISTORY);
+               options.addOption(PYFILES_OPTION);
+               options.addOption(PYREQUIREMENTS_OPTION);
+               options.addOption(PYARCHIVE_OPTION);
+               options.addOption(PYEXEC_OPTION);
                return options;
        }
 
@@ -154,6 +166,10 @@ public class CliOptionsParser {
                options.addOption(OPTION_ENVIRONMENT);
                options.addOption(OPTION_UPDATE);
                options.addOption(OPTION_HISTORY);
+               options.addOption(PYFILES_OPTION);
+               options.addOption(PYREQUIREMENTS_OPTION);
+               options.addOption(PYARCHIVE_OPTION);
+               options.addOption(PYEXEC_OPTION);
                return options;
        }
 
@@ -162,6 +178,10 @@ public class CliOptionsParser {
                options.addOption(OPTION_DEFAULTS);
                options.addOption(OPTION_JAR);
                options.addOption(OPTION_LIBRARY);
+               options.addOption(PYFILES_OPTION);
+               options.addOption(PYREQUIREMENTS_OPTION);
+               options.addOption(PYARCHIVE_OPTION);
+               options.addOption(PYEXEC_OPTION);
                return options;
        }
 
@@ -249,7 +269,8 @@ public class CliOptionsParser {
                                checkUrls(line, CliOptionsParser.OPTION_JAR),
                                checkUrls(line, 
CliOptionsParser.OPTION_LIBRARY),
                                
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
-                               
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt())
+                               
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
+                               getPythonConfiguration(line)
                        );
                }
                catch (ParseException e) {
@@ -269,7 +290,8 @@ public class CliOptionsParser {
                                checkUrls(line, CliOptionsParser.OPTION_JAR),
                                checkUrls(line, 
CliOptionsParser.OPTION_LIBRARY),
                                
line.getOptionValue(CliOptionsParser.OPTION_UPDATE.getOpt()),
-                               
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt())
+                               
line.getOptionValue(CliOptionsParser.OPTION_HISTORY.getOpt()),
+                               getPythonConfiguration(line)
                        );
                }
                catch (ParseException e) {
@@ -289,7 +311,8 @@ public class CliOptionsParser {
                                checkUrls(line, CliOptionsParser.OPTION_JAR),
                                checkUrls(line, 
CliOptionsParser.OPTION_LIBRARY),
                                null,
-                               null
+                               null,
+                               getPythonConfiguration(line)
                        );
                }
                catch (ParseException e) {
@@ -331,4 +354,18 @@ public class CliOptionsParser {
                }
                return sessionId;
        }
+
+       private static Configuration getPythonConfiguration(CommandLine line) {
+               try {
+                       Class<?> clazz = Class.forName(
+                               
"org.apache.flink.python.util.PythonDependencyUtils",
+                               true,
+                               Thread.currentThread().getContextClassLoader());
+                       Method parsePythonDependencyConfiguration =
+                               
clazz.getMethod("parsePythonDependencyConfiguration", CommandLine.class);
+                       return (Configuration) 
parsePythonDependencyConfiguration.invoke(null, line);
+               } catch (ClassNotFoundException | NoSuchMethodException | 
IllegalAccessException | InvocationTargetException e) {
+                       throw new SqlClientException("Failed to parse the 
Python command line options.", e);
+               }
+       }
 }

Reply via email to