This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6b04a50ae21 [FLINK-28114][python] Fix the issue that the Python client interpreter could not point to an archive file in distributed file system 6b04a50ae21 is described below commit 6b04a50ae2182d4cdd8e44ea9a16171d1d2394ce Author: Dian Fu <dia...@apache.org> AuthorDate: Sun Jun 19 22:47:58 2022 +0800 [FLINK-28114][python] Fix the issue that the Python client interpreter could not point to an archive file in distributed file system --- docs/content.zh/docs/deployment/cli.md | 3 +++ docs/content/docs/deployment/cli.md | 3 +++ .../apache/flink/client/python/PythonEnvUtils.java | 27 +++++++++++++++++++--- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/docs/content.zh/docs/deployment/cli.md b/docs/content.zh/docs/deployment/cli.md index a4e5d3ea770..981ab7747c1 100644 --- a/docs/content.zh/docs/deployment/cli.md +++ b/docs/content.zh/docs/deployment/cli.md @@ -434,6 +434,9 @@ $ ./bin/flink run-application -t yarn-application \ <span class="label label-info">Note</span> As it executes the job on the JobManager in YARN application mode, the paths specified in `-pyarch` and `-py` are paths relative to `shipfiles` which is the directory name of the shipped files. +<span class="label label-info">Note</span> The archive files specified via `-pyarch` will be distributed to the TaskManagers through blob server where the file size limit is 2 GB. +If the size of an archive file is more than 2 GB, you could upload it to a distributed file system and then use the path in the command line option `-pyarch`. + - Run a PyFlink application on a native Kubernetes cluster having the cluster ID `<ClusterId>`, it requires a docker image with PyFlink installed, please refer to [Enabling PyFlink in docker]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#enabling-python): ```bash $ ./bin/flink run-application \ diff --git a/docs/content/docs/deployment/cli.md b/docs/content/docs/deployment/cli.md index 8f81153d9e7..783b302bc6c 100644 --- a/docs/content/docs/deployment/cli.md +++ b/docs/content/docs/deployment/cli.md @@ -432,6 +432,9 @@ $ ./bin/flink run-application -t yarn-application \ <span class="label label-info">Note</span> As it executes the job on the JobManager in YARN application mode, the paths specified in `-pyarch` and `-py` are paths relative to `shipfiles` which is the directory name of the shipped files. +<span class="label label-info">Note</span> The archive files specified via `-pyarch` will be distributed to the TaskManagers through blob server where the file size limit is 2 GB. +If the size of an archive file is more than 2 GB, you could upload it to a distributed file system and then use the path in the command line option `-pyarch`. + - Run a PyFlink application on a native Kubernetes cluster having the cluster ID `<ClusterId>`, it requires a docker image with PyFlink installed, please refer to [Enabling PyFlink in docker]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#enabling-python): ```bash $ ./bin/flink run-application \ diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java index 261aa82dc76..f018703dcdc 100644 --- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java +++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java @@ -171,12 +171,33 @@ final class PythonEnvUtils { originalFileName = archivePath.getName(); } else { archivePath = new Path(archive); - targetDirName = archivePath.getName(); - originalFileName = targetDirName; + originalFileName = archivePath.getName(); + targetDirName = originalFileName; + } + + Path localArchivePath = archivePath; + try { + if (archivePath.getFileSystem().isDistributedFS()) { + localArchivePath = + new Path( + env.tempDirectory, + String.join( + File.separator, + UUID.randomUUID().toString(), + originalFileName)); + FileUtils.copy(archivePath, localArchivePath, false); + } + } catch (IOException e) { + String msg = + String.format( + "Error occurred when copying %s to %s.", + archivePath, localArchivePath); + throw new RuntimeException(msg, e); } + try { CompressionUtils.extractFile( - archivePath.getPath(), + localArchivePath.getPath(), String.join( File.separator, env.archivesDirectory,