This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b04a50ae21 [FLINK-28114][python] Fix the issue that the Python client 
interpreter could not point to an archive file in distributed file system
6b04a50ae21 is described below

commit 6b04a50ae2182d4cdd8e44ea9a16171d1d2394ce
Author: Dian Fu <dia...@apache.org>
AuthorDate: Sun Jun 19 22:47:58 2022 +0800

    [FLINK-28114][python] Fix the issue that the Python client interpreter 
could not point to an archive file in distributed file system
---
 docs/content.zh/docs/deployment/cli.md             |  3 +++
 docs/content/docs/deployment/cli.md                |  3 +++
 .../apache/flink/client/python/PythonEnvUtils.java | 27 +++++++++++++++++++---
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/docs/content.zh/docs/deployment/cli.md 
b/docs/content.zh/docs/deployment/cli.md
index a4e5d3ea770..981ab7747c1 100644
--- a/docs/content.zh/docs/deployment/cli.md
+++ b/docs/content.zh/docs/deployment/cli.md
@@ -434,6 +434,9 @@ $ ./bin/flink run-application -t yarn-application \
 
 <span class="label label-info">Note</span> As it executes the job on the 
JobManager in YARN application mode, the paths specified in `-pyarch` and `-py` 
are paths relative to `shipfiles` which is the directory name of the shipped 
files.
 
+<span class="label label-info">Note</span> The archive files specified via 
`-pyarch` will be distributed to the TaskManagers through blob server where the 
file size limit is 2 GB.
+If the size of an archive file is more than 2 GB, you could upload it to a 
distributed file system and then use the path in the command line option 
`-pyarch`.
+
 - Run a PyFlink application on a native Kubernetes cluster having the cluster 
ID `<ClusterId>`, it requires a docker image with PyFlink installed, please 
refer to [Enabling PyFlink in docker]({{< ref 
"docs/deployment/resource-providers/standalone/docker" >}}#enabling-python):
 ```bash
 $ ./bin/flink run-application \
diff --git a/docs/content/docs/deployment/cli.md 
b/docs/content/docs/deployment/cli.md
index 8f81153d9e7..783b302bc6c 100644
--- a/docs/content/docs/deployment/cli.md
+++ b/docs/content/docs/deployment/cli.md
@@ -432,6 +432,9 @@ $ ./bin/flink run-application -t yarn-application \
 
 <span class="label label-info">Note</span> As it executes the job on the 
JobManager in YARN application mode, the paths specified in `-pyarch` and `-py` 
are paths relative to `shipfiles` which is the directory name of the shipped 
files.
 
+<span class="label label-info">Note</span> The archive files specified via 
`-pyarch` will be distributed to the TaskManagers through blob server where the 
file size limit is 2 GB.
+If the size of an archive file is more than 2 GB, you could upload it to a 
distributed file system and then use the path in the command line option 
`-pyarch`.
+
 - Run a PyFlink application on a native Kubernetes cluster having the cluster 
ID `<ClusterId>`, it requires a docker image with PyFlink installed, please 
refer to [Enabling PyFlink in docker]({{< ref 
"docs/deployment/resource-providers/standalone/docker" >}}#enabling-python):
 ```bash
 $ ./bin/flink run-application \
diff --git 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index 261aa82dc76..f018703dcdc 100644
--- 
a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -171,12 +171,33 @@ final class PythonEnvUtils {
                                         originalFileName = 
archivePath.getName();
                                     } else {
                                         archivePath = new Path(archive);
-                                        targetDirName = archivePath.getName();
-                                        originalFileName = targetDirName;
+                                        originalFileName = 
archivePath.getName();
+                                        targetDirName = originalFileName;
+                                    }
+
+                                    Path localArchivePath = archivePath;
+                                    try {
+                                        if 
(archivePath.getFileSystem().isDistributedFS()) {
+                                            localArchivePath =
+                                                    new Path(
+                                                            env.tempDirectory,
+                                                            String.join(
+                                                                    
File.separator,
+                                                                    
UUID.randomUUID().toString(),
+                                                                    
originalFileName));
+                                            FileUtils.copy(archivePath, 
localArchivePath, false);
+                                        }
+                                    } catch (IOException e) {
+                                        String msg =
+                                                String.format(
+                                                        "Error occurred when 
copying %s to %s.",
+                                                        archivePath, 
localArchivePath);
+                                        throw new RuntimeException(msg, e);
                                     }
+
                                     try {
                                         CompressionUtils.extractFile(
-                                                archivePath.getPath(),
+                                                localArchivePath.getPath(),
                                                 String.join(
                                                         File.separator,
                                                         env.archivesDirectory,

Reply via email to