This is an automated email from the ASF dual-hosted git repository.

mbalassi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e63aa12252843d0098a56f3091b28d48aff5b5af
Author: Ferenc Csaky <ferenc.cs...@pm.me>
AuthorDate: Wed Jan 10 17:19:57 2024 +0100

    [FLINK-28915][k8s] Support fetching remote job jar and additional 
dependencies on Kubernetes
    
    Closes #24065.
---
 docs/content.zh/docs/deployment/config.md          |   9 +-
 .../resource-providers/native_kubernetes.md        |  16 +-
 .../resource-providers/standalone/docker.md        |  28 +-
 .../resource-providers/standalone/kubernetes.md    |   6 +-
 docs/content/docs/deployment/config.md             |   9 +-
 .../resource-providers/native_kubernetes.md        |  15 +-
 .../resource-providers/standalone/docker.md        |  28 +-
 .../resource-providers/standalone/kubernetes.md    |   6 +-
 .../resource-providers/standalone/overview.md      |  16 +-
 .../generated/artifact_fetch_configuration.html    |  18 +-
 .../generated/kubernetes_config_configuration.html |   6 -
 flink-clients/pom.xml                              |   8 +
 .../flink/client/cli/ArtifactFetchOptions.java     |  30 ++-
 .../program/DefaultPackagedProgramRetriever.java   |  80 ++++--
 .../program/artifact/ArtifactFetchManager.java     | 195 ++++++++++++++
 .../client/program/artifact/ArtifactFetcher.java   |   7 +-
 .../client/program/artifact/ArtifactUtils.java     |  50 ++--
 ...ArtifactFetcher.java => FsArtifactFetcher.java} |  14 +-
 .../program/artifact/HttpArtifactFetcher.java      |  12 +-
 .../program/artifact/LocalArtifactFetcher.java     |  42 +++
 .../DefaultPackagedProgramRetrieverITCase.java     | 125 ++++++++-
 .../program/artifact/ArtifactFetchManagerTest.java | 281 +++++++++++++++++++++
 .../client/program/artifact/ArtifactUtilsTest.java | 123 +--------
 .../client/testjar/ClasspathProviderExtension.java |  11 +
 .../StandaloneApplicationClusterConfiguration.java |  14 +-
 ...plicationClusterConfigurationParserFactory.java |  19 +-
 .../StandaloneApplicationClusterEntryPoint.java    |  54 ++--
 ...ationClusterConfigurationParserFactoryTest.java |  30 ++-
 ...StandaloneApplicationClusterEntryPointTest.java |  56 ----
 .../flink/configuration/GlobalConfiguration.java   |   3 +-
 .../configuration/KubernetesConfigOptions.java     |  28 --
 .../KubernetesApplicationClusterEntrypoint.java    |  55 ++--
 .../decorators/InitJobManagerDecorator.java        |  38 +--
 .../parameters/KubernetesJobManagerParameters.java |   2 +-
 ...KubernetesApplicationClusterEntrypointTest.java |   3 +-
 .../factory/KubernetesJobManagerFactoryTest.java   |  24 +-
 .../KubernetesJobManagerParametersTest.java        |   2 +-
 .../flink-clients-test-utils/pom.xml               |  20 ++
 .../TestUserClassLoaderAdditionalArtifact.java     |  23 +-
 39 files changed, 1000 insertions(+), 506 deletions(-)

diff --git a/docs/content.zh/docs/deployment/config.md 
b/docs/content.zh/docs/deployment/config.md
index 7561852e4d0..35b85240d03 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -318,10 +318,13 @@ See the [History Server Docs]({{< ref 
"docs/deployment/advanced/historyserver" >
 ----
 ----
 
-# Artifact Fetch
+# Artifact Fetching
+
+Flink can fetch user artifacts stored locally, on remote DFS, or accessible 
via an HTTP(S) endpoint.
+{{< hint info >}}
+**Note:** This is only supported in Standalone Application Mode and Native 
Kubernetes Application Mode.
+{{< /hint >}}
 
-*Artifact Fetch* is a features that Flink will fetch user artifact stored in 
DFS or download by HTTP/HTTPS.
-Note that it is only supported in StandAlone Application Mode and Native 
Kubernetes Application Mode.
 {{< generated/artifact_fetch_configuration >}}
 
 ----
diff --git 
a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md 
b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md
index 048a4e800c1..4f3a537cdbc 100644
--- a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md
+++ b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md
@@ -107,26 +107,24 @@ $ ./bin/flink run-application \
 # FileSystem
 $ ./bin/flink run-application \
     --target kubernetes-application \
-    
-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar
 \
     -Dkubernetes.cluster-id=my-first-application-cluster \
     -Dkubernetes.container.image=custom-image-name \
     s3://my-bucket/my-flink-job.jar
 
-# Http/Https Schema
+# HTTP(S)
 $ ./bin/flink run-application \
     --target kubernetes-application \
     -Dkubernetes.cluster-id=my-first-application-cluster \
     -Dkubernetes.container.image=custom-image-name \
-    http://ip:port/my-flink-job.jar
+    https://ip:port/my-flink-job.jar
 ```
 {{< hint info >}}
-Now, The jar artifact supports downloading from the [flink filesystem]({{< ref 
"docs/deployment/filesystems/overview" >}}) or Http/Https in Application Mode.  
-The jar package will be downloaded from filesystem to
-[user.artifacts.base.dir]({{< ref "docs/deployment/config" 
>}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref 
"docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< 
ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
+JAR fetching supports downloading from [filesystems]({{< ref 
"docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode.  
+The JAR will be downloaded to
+[user.artifacts.base-dir]({{< ref "docs/deployment/config" 
>}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref 
"docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< 
ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
 {{< /hint >}}
 
-<span class="label label-info">Note</span> `local` schema is still supported. 
If you use `local` schema,  the jar must be provided in the image or download 
by a init container like [Example]({{< ref 
"docs/deployment/resource-providers/native_kubernetes" 
>}}#example-of-pod-template).
-
+<span class="label label-info">Note</span> `local` schema is still supported. 
If you use `local` schema, the JAR must be provided in the image or downloaded 
by an init container as described in [this example](#example-of-pod-template).
 
 The `kubernetes.cluster-id` option specifies the cluster name and must be 
unique.
 If you do not specify this option, then Flink will generate a random name.
@@ -348,7 +346,7 @@ $ kubectl create clusterrolebinding 
flink-role-binding-default --clusterrole=edi
 ```
 
 If you do not want to use the `default` service account, use the following 
command to create a new `flink-service-account` service account and set the 
role binding.
-Then use the config option 
`-Dkubernetes.service-account=flink-service-account` to make the JobManager pod 
use the `flink-service-account` service account to create/delete TaskManager 
pods and leader ConfigMaps.
+Then use the config option 
`-Dkubernetes.service-account=flink-service-account` to configure the 
JobManager pod's service account used to create and delete TaskManager pods and 
leader ConfigMaps.
 Also this will allow the TaskManager to watch leader ConfigMaps to retrieve 
the address of JobManager and ResourceManager.
 
 ```bash
diff --git 
a/docs/content.zh/docs/deployment/resource-providers/standalone/docker.md 
b/docs/content.zh/docs/deployment/resource-providers/standalone/docker.md
index ea12beb65ec..a4c6a37819f 100644
--- a/docs/content.zh/docs/deployment/resource-providers/standalone/docker.md
+++ b/docs/content.zh/docs/deployment/resource-providers/standalone/docker.md
@@ -121,7 +121,7 @@ The *job artifacts* are included into the class path of 
Flink's JVM process with
 * all other necessary dependencies or resources, not included into Flink.
 
 To deploy a cluster for a single job with Docker, you need to
-* make *job artifacts* available locally in all containers under 
`/opt/flink/usrlib` or pass jar path by *jar-file* argument. 
+* make *job artifacts* available locally in all containers under 
`/opt/flink/usrlib`, or pass a list of jars via the `--jars` argument
 * start a JobManager container in the *Application cluster* mode
 * start the required number of TaskManager containers.
 
@@ -156,7 +156,6 @@ To make the **job artifacts available** locally in the 
container, you can
 
 * **or extend the Flink image** by writing a custom `Dockerfile`, build it and 
use it for starting the JobManager and TaskManagers:
 
-
     ```dockerfile
     FROM flink
     ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
@@ -175,8 +174,7 @@ To make the **job artifacts available** locally in the 
container, you can
     $ docker run flink_with_job_artifacts taskmanager
     ```
 
-* **or pass jar path by jar-file argument**  when you start the JobManager:
-
+* **or pass jar path by `jars` argument**  when you start the JobManager:
 
     ```sh
     $ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
@@ -184,27 +182,31 @@ To make the **job artifacts available** locally in the 
container, you can
 
     $ docker run \
         --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
-        --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \
+        --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{< version >}}.jar \
         --name=jobmanager \
         --network flink-network \
         flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< 
/stable >}}{{< unstable >}}latest{{< /unstable >}} standalone-job \
         --job-classname com.job.ClassName \
-        --jar-file s3://my-bucket/my-flink-job.jar
+        --jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar 
\
         [--job-id <job id>] \
         [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
         [job arguments]
-  
+    ```
+
 The `standalone-job` argument starts a JobManager container in the Application 
Mode.
 
 #### JobManager additional command line arguments
 
 You can provide the following additional command line arguments to the cluster 
entrypoint:
 
-* `--job-classname <job class name>`: Class name of the job to run.
+* `--job-classname <job class name>` (optional): Class name of the job to run.
 
   By default, Flink scans its class path for a JAR with a Main-Class or 
program-class manifest entry and chooses it as the job class.
   Use this command line argument to manually set the job class.
+
+  {{< hint warning >}}
   This argument is required in case that no or more than one JAR with such a 
manifest entry is available on the class path.
+  {{< /hint >}}
 
 * `--job-id <job id>` (optional): Manually set a Flink job ID for the job 
(default: 00000000000000000000000000000000)
 
@@ -216,12 +218,12 @@ You can provide the following additional command line 
arguments to the cluster e
 
 * `--allowNonRestoredState` (optional): Skip broken savepoint state
 
-  Additionally you can specify this argument to allow that savepoint state is 
skipped which cannot be restored.
+  Additionally, you can specify this argument to allow that savepoint state is 
skipped which cannot be restored.
 
-* `--jar-file` (optional): the path of jar artifact 
+* `--jars` (optional): the paths of the job jar and any additional artifact(s) 
separated by commas 
 
-  You can specify this argument to point the job artifacts stored in [flink 
filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https. 
Flink will fetch it when deploy the job.
-  (e.g., s3://my-bucket/my-flink-job.jar).
+  You can specify this argument to point the job artifacts stored in [flink 
filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or download via 
HTTP(S).
+  Flink will fetch these during the job deployment. (e.g. `--jars 
s3://my-bucket/my-flink-job.jar`, `--jars 
s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar` ).
 
 If the main function of the user job main class accepts arguments, you can 
also pass them at the end of the `docker run` command.
 
@@ -326,7 +328,7 @@ services:
     image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< 
/stable >}}{{< unstable >}}latest{{< /unstable >}}
     ports:
       - "8081:8081"
-    command: standalone-job --job-classname com.job.ClassName [--job-id <job 
id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] 
["--jar-file" "/path/to/user-artifact"] [job arguments]
+    command: standalone-job --job-classname com.job.ClassName [--jars 
/path/to/artifact1,/path/to/artifact2] [--job-id <job id>] [--fromSavepoint 
/path/to/savepoint] [--allowNonRestoredState] [job arguments]
     volumes:
       - /host/path/to/job/artifacts:/opt/flink/usrlib
     environment:
diff --git 
a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md 
b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
index c1a3069ac51..95fb16d7dec 100644
--- 
a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
+++ 
b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md
@@ -120,7 +120,7 @@ $ ./bin/flink run -m localhost:8081 
./examples/streaming/TopSpeedWindowing.jar
 
 * 可以从 [资源定义示例](#application-cluster-resource-definitions) 中的 
`job-artifacts-volume` 处获取。假如是在 minikube 集群中创建这些组件,那么定义示例中的 
job-artifacts-volume 可以挂载为主机的本地目录。如果不使用 minikube 集群,那么可以使用 Kubernetes 
集群中任何其它可用类型的 volume 来提供 *job artifacts*
 * 构建一个已经包含 *job artifacts* 参数的[自定义镜像]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#advanced-customization)。
-* 通过指定[--jar file]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#jobmanager-additional-command-line-arguments)参数提供 
存储在DFS或者可由HTTP/HTTPS下载的*job artifacts*路径
+* 通过指定[--jars]({{< ref "docs/deployment/resource-providers/standalone/docker" 
>}}#jobmanager-additional-command-line-arguments)参数提供 
存储在DFS或者可由HTTP/HTTPS下载的*job artifacts*路径
 
 在创建[通用集群组件](#common-cluster-resource-definitions)后,指定 [Application 
集群资源定义](#application-cluster-resource-definitions)文件,执行 `kubectl` 命令来启动 Flink 
Application 集群:
 
@@ -627,7 +627,7 @@ spec:
         - name: jobmanager
           image: apache/flink:{{< stable >}}{{< version >}}-scala{{< 
scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
           env:
-          args: ["standalone-job", "--job-classname", "com.job.ClassName", 
<optional arguments>, <job arguments>] # 可选的参数项: ["--job-id", "<job id>", 
"--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState", 
"--jar-file", "/path/to/user-artifact"]
+          args: ["standalone-job", "--job-classname", "com.job.ClassName", 
<optional arguments>, <job arguments>] # 可选的参数项: ["--job-id", "<job id>", 
"--jars", "/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", 
"/path/to/savepoint", "--allowNonRestoredState"]
           ports:
             - containerPort: 6123
               name: rpc
@@ -686,7 +686,7 @@ spec:
                 apiVersion: v1
                 fieldPath: status.podIP
           # 下面的 args 参数会使用 POD_IP 对应的值覆盖 config map 中 jobmanager.rpc.address 
的属性值。
-          args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", 
"com.job.ClassName", <optional arguments>, <job arguments>] # 可选参数项: 
["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", 
"--allowNonRestoredState", "--jar-file", "/path/to/user-artifact"]
+          args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", 
"com.job.ClassName", <optional arguments>, <job arguments>] # 可选参数项: 
["--job-id", "<job id>", "--jars", "/path/to/artifact1,/path/to/artifact2", 
"--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
           ports:
             - containerPort: 6123
               name: rpc
diff --git a/docs/content/docs/deployment/config.md 
b/docs/content/docs/deployment/config.md
index 53f03a29cfb..c6546630888 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -320,10 +320,13 @@ See the [History Server Docs]({{< ref 
"docs/deployment/advanced/historyserver" >
 ----
 ----
 
-# Artifact Fetch
+# Artifact Fetching
+
+Flink can fetch user artifacts stored locally, on remote DFS, or accessible 
via an HTTP(S) endpoint.
+{{< hint info >}}
+**Note:** This is only supported in Standalone Application Mode and Native 
Kubernetes Application Mode.
+{{< /hint >}}
 
-*Artifact Fetch* is a features that Flink will fetch user artifact stored in 
DFS or download by HTTP/HTTPS.
-Note that it is only supported in StandAlone Application Mode and Native 
Kubernetes Application Mode.
 {{< generated/artifact_fetch_configuration >}}
 
 ----
diff --git 
a/docs/content/docs/deployment/resource-providers/native_kubernetes.md 
b/docs/content/docs/deployment/resource-providers/native_kubernetes.md
index 0bc75c26f2b..8b676ab983b 100644
--- a/docs/content/docs/deployment/resource-providers/native_kubernetes.md
+++ b/docs/content/docs/deployment/resource-providers/native_kubernetes.md
@@ -119,19 +119,20 @@ $ ./bin/flink run-application \
     -Dkubernetes.container.image=custom-image-name \
     s3://my-bucket/my-flink-job.jar
 
-# Http/Https Schema
+# HTTP(S)
 $ ./bin/flink run-application \
     --target kubernetes-application \
     -Dkubernetes.cluster-id=my-first-application-cluster \
     -Dkubernetes.container.image=custom-image-name \
-    http://ip:port/my-flink-job.jar
+    https://ip:port/my-flink-job.jar
 ```
 {{< hint info >}}
-Now, The jar artifact supports downloading from the [flink filesystem]({{< ref 
"docs/deployment/filesystems/overview" >}}) or Http/Https in Application Mode.  
-The jar package will be downloaded from filesystem to
-[user.artifacts.base.dir]({{< ref "docs/deployment/config" 
>}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref 
"docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< 
ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
+JAR fetching supports downloading from [filesystems]({{< ref 
"docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode.  
+The JAR will be downloaded to
+[user.artifacts.base-dir]({{< ref "docs/deployment/config" 
>}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref 
"docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< 
ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image.
 {{< /hint >}}
-<span class="label label-info">Note</span> `local` schema is still supported. 
If you use `local` schema,  the jar must be provided in the image or download 
by a init container like [Example]({{< ref 
"docs/deployment/resource-providers/native_kubernetes" 
>}}#example-of-pod-template).
+
+<span class="label label-info">Note</span> `local` schema is still supported. 
If you use `local` schema, the JAR must be provided in the image or downloaded 
by an init container as described in [this example](#example-of-pod-template).
 
 The `kubernetes.cluster-id` option specifies the cluster name and must be 
unique.
 If you do not specify this option, then Flink will generate a random name.
@@ -353,7 +354,7 @@ $ kubectl create clusterrolebinding 
flink-role-binding-default --clusterrole=edi
 ```
 
 If you do not want to use the `default` service account, use the following 
command to create a new `flink-service-account` service account and set the 
role binding.
-Then use the config option 
`-Dkubernetes.service-account=flink-service-account` to make the JobManager pod 
use the `flink-service-account` service account to create/delete TaskManager 
pods and leader ConfigMaps.
+Then use the config option 
`-Dkubernetes.service-account=flink-service-account` to configure the 
JobManager pod's service account used to create and delete TaskManager pods and 
leader ConfigMaps.
 Also this will allow the TaskManager to watch leader ConfigMaps to retrieve 
the address of JobManager and ResourceManager.
 
 ```bash
diff --git 
a/docs/content/docs/deployment/resource-providers/standalone/docker.md 
b/docs/content/docs/deployment/resource-providers/standalone/docker.md
index 3b9917089da..ba355b69ced 100644
--- a/docs/content/docs/deployment/resource-providers/standalone/docker.md
+++ b/docs/content/docs/deployment/resource-providers/standalone/docker.md
@@ -121,7 +121,7 @@ The *job artifacts* are included into the class path of 
Flink's JVM process with
 * all other necessary dependencies or resources, not included into Flink.
 
 To deploy a cluster for a single job with Docker, you need to
-* make *job artifacts* available locally in all containers under 
`/opt/flink/usrlib` or pass jar path by *jar-file* argument. 
+* make *job artifacts* available locally in all containers under 
`/opt/flink/usrlib`, or pass a list of jars via the `--jars` argument
 * start a JobManager container in the *Application cluster* mode
 * start the required number of TaskManager containers.
 
@@ -156,7 +156,6 @@ To make the **job artifacts available** locally in the 
container, you can
 
 * **or extend the Flink image** by writing a custom `Dockerfile`, build it and 
use it for starting the JobManager and TaskManagers:
 
-
     ```dockerfile
     FROM flink
     ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
@@ -175,8 +174,7 @@ To make the **job artifacts available** locally in the 
container, you can
     $ docker run flink_with_job_artifacts taskmanager
     ```
 
-* **or pass jar path by jar-file argument**  when you start the JobManager:
-
+* **or pass jar path by `jars` argument**  when you start the JobManager:
 
     ```sh
     $ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
@@ -184,27 +182,31 @@ To make the **job artifacts available** locally in the 
container, you can
 
     $ docker run \
         --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
-        --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \
+        --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{< version >}}.jar \
         --name=jobmanager \
         --network flink-network \
         flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< 
/stable >}}{{< unstable >}}latest{{< /unstable >}} standalone-job \
         --job-classname com.job.ClassName \
-        --jar-file s3://my-bucket/my-flink-job.jar
+        --jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar 
\
         [--job-id <job id>] \
         [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
         [job arguments]
-  
+    ```
+
 The `standalone-job` argument starts a JobManager container in the Application 
Mode.
 
 #### JobManager additional command line arguments
 
 You can provide the following additional command line arguments to the cluster 
entrypoint:
 
-* `--job-classname <job class name>`: Class name of the job to run.
+* `--job-classname <job class name>` (optional): Class name of the job to run.
 
   By default, Flink scans its class path for a JAR with a Main-Class or 
program-class manifest entry and chooses it as the job class.
   Use this command line argument to manually set the job class.
+
+  {{< hint warning >}}
   This argument is required in case that no or more than one JAR with such a 
manifest entry is available on the class path.
+  {{< /hint >}}
 
 * `--job-id <job id>` (optional): Manually set a Flink job ID for the job 
(default: 00000000000000000000000000000000)
 
@@ -216,12 +218,12 @@ You can provide the following additional command line 
arguments to the cluster e
 
 * `--allowNonRestoredState` (optional): Skip broken savepoint state
 
-  Additionally you can specify this argument to allow that savepoint state is 
skipped which cannot be restored.
+  Additionally, you can specify this argument to allow that savepoint state is 
skipped which cannot be restored.
 
-* `--jar-file` (optional): the path of jar artifact 
+* `--jars` (optional): the paths of the job jar and any additional artifact(s) 
separated by commas
 
-  You can specify this argument to point the job artifacts stored in [flink 
filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https. 
Flink will fetch it when deploy the job.
-  (e.g., s3://my-bucket/my-flink-job.jar).
+  You can specify this argument to point the job artifacts stored in [flink 
filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or download via 
HTTP(S).
+  Flink will fetch these during the job deployment. (e.g. `--jars 
s3://my-bucket/my-flink-job.jar`, `--jars 
s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar` ).
 
 If the main function of the user job main class accepts arguments, you can 
also pass them at the end of the `docker run` command.
 
@@ -326,7 +328,7 @@ services:
     image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< 
/stable >}}{{< unstable >}}latest{{< /unstable >}}
     ports:
       - "8081:8081"
-    command: standalone-job --job-classname com.job.ClassName [--job-id <job 
id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] 
["--jar-file" "/path/to/user-artifact"] [job arguments]
+    command: standalone-job --job-classname com.job.ClassName [--job-id <job 
id>] [--jars /path/to/artifact1,/path/to/artifact2] [--fromSavepoint 
/path/to/savepoint] [--allowNonRestoredState] [job arguments]
     volumes:
       - /host/path/to/job/artifacts:/opt/flink/usrlib
     environment:
diff --git 
a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md 
b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
index c2759d45fa6..f050a19e21c 100644
--- a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
+++ b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md
@@ -124,7 +124,7 @@ The *job artifacts* could be provided by these way:
    The definition examples mount the volume as a local directory of the host 
assuming that you create the components in a minikube cluster.
    If you do not use a minikube cluster, you can use any other type of volume, 
available in your Kubernetes cluster, to supply the *job artifacts*.
 *  You can build [a custom image]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#advanced-customization) which already contains the artifacts instead.
-*  You can specify [--jar file]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#jobmanager-additional-command-line-arguments)arguments to point the job 
artifacts stored in [flink filesystem]({{< ref 
"docs/deployment/filesystems/overview" >}}) or Http/Https.
+*  You can pass artifacts via the [--jars]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#jobmanager-additional-command-line-arguments) option that are stored 
locally, on [remote DFS]({{< ref "docs/deployment/filesystems/overview" >}}), 
or accessible via an HTTP(S) endpoint.
 
 After creating [the common cluster 
components](#common-cluster-resource-definitions), use [the Application cluster 
specific resource definitions](#application-cluster-resource-definitions) to 
launch the cluster with the `kubectl` command:
 
@@ -613,7 +613,7 @@ spec:
         - name: jobmanager
           image: apache/flink:{{< stable >}}{{< version >}}-scala{{< 
scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
           env:
-          args: ["standalone-job", "--job-classname", "com.job.ClassName", 
<optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job 
id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState", 
"--jar-file", "/path/to/user-artifact"]
+          args: ["standalone-job", "--job-classname", "com.job.ClassName", 
<optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job 
id>", "--jars", "/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", 
"/path/to/savepoint", "--allowNonRestoredState"]
           ports:
             - containerPort: 6123
               name: rpc
@@ -672,7 +672,7 @@ spec:
                 apiVersion: v1
                 fieldPath: status.podIP
           # The following args overwrite the value of jobmanager.rpc.address 
configured in the configuration config map to POD_IP.
-          args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", 
"com.job.ClassName", <optional arguments>, <job arguments>] # optional 
arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", 
"--allowNonRestoredState", "--jar-file", "/path/to/user-artifact"]
+          args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", 
"com.job.ClassName", <optional arguments>, <job arguments>] # optional 
arguments: ["--job-id", "<job id>", "--jars", 
"/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", 
"/path/to/savepoint", "--allowNonRestoredState"]
           ports:
             - containerPort: 6123
               name: rpc
diff --git 
a/docs/content/docs/deployment/resource-providers/standalone/overview.md 
b/docs/content/docs/deployment/resource-providers/standalone/overview.md
index 1c0b4b94ea9..7c37f2789e1 100644
--- a/docs/content/docs/deployment/resource-providers/standalone/overview.md
+++ b/docs/content/docs/deployment/resource-providers/standalone/overview.md
@@ -96,13 +96,25 @@ Then, we can launch the JobManager:
 $ ./bin/standalone-job.sh start --job-classname 
org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
 ```
 
-The web interface is now available at [localhost:8081](http://localhost:8081). 
However, the application won't be able to start, because there are no 
TaskManagers running yet:
+The web interface is now available at [localhost:8081](http://localhost:8081).
+
+{{< hint info >}}
+Another approach would be to use the artifact fetching mechanism via the 
`--jars` option:
+
+```bash
+$ ./bin/standalone-job.sh start -D 
user.artifacts.base-dir=/tmp/flink-artifacts --jars 
local:///path/to/TopSpeedWindowing.jar
+```
+
+Read more about this CLI option [here]({{< ref 
"docs/deployment/resource-providers/standalone/docker" 
>}}#jobmanager-additional-command-line-arguments).
+{{< /hint >}}
+
+However, the application won't be able to start, because there are no 
TaskManagers running yet:
 
 ```bash
 $ ./bin/taskmanager.sh start
 ```
 
-Note: You can start multiple TaskManagers, if your application needs more 
resources.
+<span class="label label-info">Note</span> You can start multiple 
TaskManagers, if your application needs more resources.
 
 Stopping the services is also supported via the scripts. Call them multiple 
times if you want to stop multiple instances, or use `stop-all`:
 
diff --git 
a/docs/layouts/shortcodes/generated/artifact_fetch_configuration.html 
b/docs/layouts/shortcodes/generated/artifact_fetch_configuration.html
index be2cdb4a324..c97d999fccb 100644
--- a/docs/layouts/shortcodes/generated/artifact_fetch_configuration.html
+++ b/docs/layouts/shortcodes/generated/artifact_fetch_configuration.html
@@ -9,16 +9,28 @@
     </thead>
     <tbody>
         <tr>
-            <td><h5>user.artifacts.base.dir</h5></td>
+            <td><h5>user.artifacts.artifact-list</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>List&lt;String&gt;</td>
+            <td>A semicolon-separated list of the additional artifacts to 
fetch for the job before setting up the application cluster. All given elements 
have to be valid URIs. Example: 
s3://sandbox-bucket/format.jar;http://sandbox-server:1234/udf.jar</td>
+        </tr>
+        <tr>
+            <td><h5>user.artifacts.base-dir</h5></td>
             <td style="word-wrap: break-word;">"/opt/flink/artifacts"</td>
             <td>String</td>
             <td>The base dir to put the application job artifacts.</td>
         </tr>
         <tr>
-            <td><h5>user.artifacts.http.header</h5></td>
+            <td><h5>user.artifacts.http-headers</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Map</td>
-            <td>Custom HTTP header for HttpArtifactFetcher. The header will be 
applied when getting the application job artifacts. Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.</td>
+            <td>Custom HTTP header(s) for the HTTP artifact fetcher. The 
header(s) will be applied when getting the application job artifacts. Expected 
format: headerKey1:headerValue1,headerKey2:headerValue2.</td>
+        </tr>
+        <tr>
+            <td><h5>user.artifacts.raw-http-enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enables artifact fetching from raw HTTP endpoints.</td>
         </tr>
     </tbody>
 </table>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html 
b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
index a0c530045cf..7bbdfd5e404 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html
@@ -290,11 +290,5 @@
             <td>Integer</td>
             <td>Defines the number of Kubernetes transactional operation 
retries before the client gives up. For example, <code 
class="highlighter-rouge">FlinkKubeClient#checkAndUpdateConfigMap</code>.</td>
         </tr>
-        <tr>
-            <td><h5>kubernetes.user.artifacts.emptyDir.enable</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>Whether to enable create mount an empty dir for <code 
class="highlighter-rouge">user.artifacts.base.dir</code> to keep user artifacts 
if container restart.</td>
-        </tr>
     </tbody>
 </table>
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index d80b06a1e6b..6f654a2c3f8 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -118,6 +118,14 @@ under the License.
                        <scope>test</scope>
                        <classifier>job-lib-jar</classifier>
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-clients-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <classifier>additional-artifact-jar</classifier>
+               </dependency>
        </dependencies>
 
        <!-- More information on this:
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java
index 2ea2fc4980a..2e634fcccb2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java
@@ -21,22 +21,40 @@ package org.apache.flink.client.cli;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.configuration.ConfigOptions.key;
+
 /** Artifact Fetch options. */
 public class ArtifactFetchOptions {
 
-    public static final ConfigOption<String> USER_ARTIFACTS_BASE_DIR =
-            ConfigOptions.key("user.artifacts.base.dir")
+    public static final ConfigOption<String> BASE_DIR =
+            ConfigOptions.key("user.artifacts.base-dir")
                     .stringType()
                     .defaultValue("/opt/flink/artifacts")
                     .withDescription("The base dir to put the application job 
artifacts.");
 
-    public static final ConfigOption<Map<String, String>> 
USER_ARTIFACT_HTTP_HEADER =
-            ConfigOptions.key("user.artifacts.http.header")
+    public static final ConfigOption<List<String>> ARTIFACT_LIST =
+            key("user.artifacts.artifact-list")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            "A semicolon-separated list of the additional 
artifacts to fetch for the job before setting up the application cluster."
+                                    + " All given elements have to be valid 
URIs. Example: 
s3://sandbox-bucket/format.jar;http://sandbox-server:1234/udf.jar";);
+
+    public static final ConfigOption<Boolean> RAW_HTTP_ENABLED =
+            ConfigOptions.key("user.artifacts.raw-http-enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Enables artifact fetching from raw HTTP 
endpoints.");
+
+    public static final ConfigOption<Map<String, String>> HTTP_HEADERS =
+            ConfigOptions.key("user.artifacts.http-headers")
                     .mapType()
                     .noDefaultValue()
                     .withDescription(
-                            "Custom HTTP header for HttpArtifactFetcher. The 
header will be applied when getting the application job artifacts. "
-                                    + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
+                            "Custom HTTP header(s) for the HTTP artifact 
fetcher. The header(s) will be applied when getting the application job 
artifacts."
+                                    + " Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java
index cb54d00dac3..654db0200c1 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionUtils;
 
 import javax.annotation.Nullable;
@@ -36,12 +35,17 @@ import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * {@code PackageProgramRetrieverImpl} is the default implementation of {@link
@@ -56,17 +60,8 @@ public class DefaultPackagedProgramRetriever implements 
PackagedProgramRetriever
     private final Configuration configuration;
 
     /**
-     * Creates a {@code PackageProgramRetrieverImpl} with the given parameters.
-     *
-     * @param userLibDir The user library directory that is used for 
generating the user classpath
-     *     if specified. The system classpath is used if not specified.
-     * @param jobClassName The job class that will be used if specified. The 
classpath is used to
-     *     detect any main class if not specified.
-     * @param programArgs The program arguments.
-     * @param configuration The Flink configuration for the given job.
-     * @return The {@code PackageProgramRetrieverImpl} that can be used to 
create a {@link
-     *     PackagedProgram} instance.
-     * @throws FlinkException If something goes wrong during instantiation.
+     * See ${@link DefaultPackagedProgramRetriever#create(File, File, 
Collection, String, String[],
+     * Configuration)} .
      */
     public static DefaultPackagedProgramRetriever create(
             @Nullable File userLibDir,
@@ -77,6 +72,20 @@ public class DefaultPackagedProgramRetriever implements 
PackagedProgramRetriever
         return create(userLibDir, null, jobClassName, programArgs, 
configuration);
     }
 
+    /**
+     * See ${@link DefaultPackagedProgramRetriever#create(File, File, 
Collection, String, String[],
+     * Configuration)} .
+     */
+    public static DefaultPackagedProgramRetriever create(
+            @Nullable File userLibDir,
+            @Nullable File jarFile,
+            @Nullable String jobClassName,
+            String[] programArgs,
+            Configuration configuration)
+            throws FlinkException {
+        return create(userLibDir, jarFile, null, jobClassName, programArgs, 
configuration);
+    }
+
     /**
      * Creates a {@code PackageProgramRetrieverImpl} with the given parameters.
      *
@@ -84,6 +93,8 @@ public class DefaultPackagedProgramRetriever implements 
PackagedProgramRetriever
      *     if specified. The system classpath is used if not specified.
      * @param jarFile The jar archive expected to contain the job class 
included; {@code null} if
      *     the job class is on the system classpath.
+     * @param userArtifacts The user artifacts that should be added to the 
user classpath if
+     *     specified.
      * @param jobClassName The job class to use; if {@code null} the user 
classpath (or, if not set,
      *     the system classpath) will be scanned for possible main class.
      * @param programArgs The program arguments.
@@ -95,18 +106,23 @@ public class DefaultPackagedProgramRetriever implements 
PackagedProgramRetriever
     public static DefaultPackagedProgramRetriever create(
             @Nullable File userLibDir,
             @Nullable File jarFile,
+            @Nullable Collection<File> userArtifacts,
             @Nullable String jobClassName,
             String[] programArgs,
             Configuration configuration)
             throws FlinkException {
         List<URL> userClasspaths;
         try {
-            final List<URL> classpathsFromUserLibDir = 
getClasspathsFromUserLibDir(userLibDir);
+            final List<URL> classpathsFromUserLibDir =
+                    getClasspathsFromUserDir(userLibDir, jarFile);
+            final List<URL> classpathsFromUserArtifactDir =
+                    getClasspathsFromArtifacts(userArtifacts, jarFile);
             final List<URL> classpathsFromConfiguration =
                     getClasspathsFromConfiguration(configuration);
 
             final List<URL> classpaths = new ArrayList<>();
             classpaths.addAll(classpathsFromUserLibDir);
+            classpaths.addAll(classpathsFromUserArtifactDir);
             classpaths.addAll(classpathsFromConfiguration);
 
             userClasspaths = Collections.unmodifiableList(classpaths);
@@ -116,7 +132,7 @@ public class DefaultPackagedProgramRetriever implements 
PackagedProgramRetriever
 
         final EntryClassInformationProvider entryClassInformationProvider =
                 createEntryClassInformationProvider(
-                        userLibDir == null ? null : userClasspaths,
+                        (userLibDir == null && userArtifacts == null) ? null : 
userClasspaths,
                         jarFile,
                         jobClassName,
                         programArgs);
@@ -185,13 +201,12 @@ public class DefaultPackagedProgramRetriever implements 
PackagedProgramRetriever
             List<URL> userClasspath,
             Configuration configuration) {
         this.entryClassInformationProvider =
-                Preconditions.checkNotNull(
+                checkNotNull(
                         entryClassInformationProvider, "No 
EntryClassInformationProvider passed.");
         this.programArguments =
-                Preconditions.checkNotNull(programArguments, "No program 
parameter array passed.");
-        this.userClasspath = Preconditions.checkNotNull(userClasspath, "No 
user classpath passed.");
-        this.configuration =
-                Preconditions.checkNotNull(configuration, "No Flink 
configuration was passed.");
+                checkNotNull(programArguments, "No program parameter array 
passed.");
+        this.userClasspath = checkNotNull(userClasspath, "No user classpath 
passed.");
+        this.configuration = checkNotNull(configuration, "No Flink 
configuration was passed.");
     }
 
     @Override
@@ -216,15 +231,34 @@ public class DefaultPackagedProgramRetriever implements 
PackagedProgramRetriever
         }
     }
 
-    private static List<URL> getClasspathsFromUserLibDir(@Nullable File 
userLibDir)
-            throws IOException {
-        if (userLibDir == null) {
+    private static List<URL> getClasspathsFromUserDir(
+            @Nullable File userDir, @Nullable File jarFile) throws IOException 
{
+        if (userDir == null) {
             return Collections.emptyList();
         }
 
+        try (Stream<Path> files = Files.list(userDir.toPath())) {
+            return getClasspathsFromArtifacts(files, jarFile);
+        }
+    }
+
+    private static List<URL> getClasspathsFromArtifacts(
+            @Nullable Collection<File> userArtifacts, @Nullable File jarFile) {
+        if (userArtifacts == null) {
+            return Collections.emptyList();
+        }
+
+        return 
getClasspathsFromArtifacts(userArtifacts.stream().map(File::toPath), jarFile);
+    }
+
+    private static List<URL> getClasspathsFromArtifacts(
+            Stream<Path> userArtifacts, @Nullable File jarFile) {
+        checkNotNull(userArtifacts);
+
         final Path workingDirectory = FileUtils.getCurrentWorkingDirectory();
         final List<URL> relativeJarURLs =
-                FileUtils.listFilesInDirectory(userLibDir.toPath(), 
FileUtils::isJarFile).stream()
+                userArtifacts
+                        .filter(path -> FileUtils.isJarFile(path) && 
!path.toFile().equals(jarFile))
                         .map(path -> 
FileUtils.relativizePath(workingDirectory, path))
                         .map(FunctionUtils.uncheckedFunction(FileUtils::toURL))
                         .collect(Collectors.toList());
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java
new file mode 100644
index 00000000000..20d491433bd
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.artifact;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.client.program.DefaultPackagedProgramRetriever;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.apache.commons.io.FilenameUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Class that manages the artifact loading process. */
+public class ArtifactFetchManager {
+
+    private final ArtifactFetcher localFetcher;
+    private final ArtifactFetcher fsFetcher;
+    private final ArtifactFetcher httpFetcher;
+
+    private final Configuration conf;
+    private final File baseDir;
+
+    public ArtifactFetchManager(Configuration conf) {
+        this(conf, null);
+    }
+
+    public ArtifactFetchManager(Configuration conf, @Nullable String baseDir) {
+        this(
+                new LocalArtifactFetcher(),
+                new FsArtifactFetcher(),
+                new HttpArtifactFetcher(),
+                conf,
+                baseDir);
+    }
+
+    @VisibleForTesting
+    ArtifactFetchManager(
+            ArtifactFetcher localFetcher,
+            ArtifactFetcher fsFetcher,
+            ArtifactFetcher httpFetcher,
+            Configuration conf,
+            @Nullable String baseDir) {
+        this.localFetcher = checkNotNull(localFetcher);
+        this.fsFetcher = checkNotNull(fsFetcher);
+        this.httpFetcher = checkNotNull(httpFetcher);
+        this.conf = checkNotNull(conf);
+        this.baseDir =
+                baseDir == null
+                        ? new File(conf.get(ArtifactFetchOptions.BASE_DIR))
+                        : new File(baseDir);
+    }
+
+    /**
+     * Fetches artifacts from a given URI string array. The job jar and any 
additional artifacts are
+     * mixed, in case of multiple artifacts the {@link 
DefaultPackagedProgramRetriever} logic will
+     * be used to find the job jar.
+     *
+     * @param uris URIs to fetch
+     * @return result with the fetched artifacts
+     */
+    public Result fetchArtifacts(String[] uris) {
+        checkArgument(uris != null && uris.length > 0, "At least one URI is 
required.");
+
+        ArtifactUtils.createMissingParents(baseDir);
+        List<File> artifacts =
+                Arrays.stream(uris)
+                        
.map(FunctionUtils.uncheckedFunction(this::fetchArtifact))
+                        .collect(Collectors.toList());
+
+        if (artifacts.size() > 1) {
+            return new Result(null, artifacts);
+        }
+
+        if (artifacts.size() == 1) {
+            return new Result(artifacts.get(0), null);
+        }
+
+        // Should not happen.
+        throw new IllegalStateException("Corrupt artifact fetching state.");
+    }
+
+    /**
+     * Fetches the job jar and any additional artifact if the given list is 
not null or empty.
+     *
+     * @param jobUri URI of the job jar
+     * @param additionalUris URI(s) of any additional artifact to fetch
+     * @return result with the fetched artifacts
+     * @throws Exception
+     */
+    public Result fetchArtifacts(String jobUri, @Nullable List<String> 
additionalUris)
+            throws Exception {
+        checkArgument(jobUri != null && !jobUri.trim().isEmpty(), "The jobUri 
is required.");
+
+        ArtifactUtils.createMissingParents(baseDir);
+        File jobJar = fetchArtifact(jobUri);
+
+        List<File> additionalArtifacts =
+                additionalUris == null
+                        ? Collections.emptyList()
+                        : additionalUris.stream()
+                                
.map(FunctionUtils.uncheckedFunction(this::fetchArtifact))
+                                .collect(Collectors.toList());
+
+        return new Result(jobJar, additionalArtifacts);
+    }
+
+    @VisibleForTesting
+    ArtifactFetcher getFetcher(URI uri) {
+        if ("local".equals(uri.getScheme())) {
+            return localFetcher;
+        }
+
+        if (isRawHttp(uri.getScheme()) || "https".equals(uri.getScheme())) {
+            return httpFetcher;
+        }
+
+        return fsFetcher;
+    }
+
+    private File fetchArtifact(String uri) throws Exception {
+        URI resolvedUri = PackagedProgramUtils.resolveURI(uri);
+        File targetFile = new File(baseDir, 
FilenameUtils.getName(resolvedUri.getPath()));
+        if (targetFile.exists()) {
+            // Already fetched user artifacts are kept.
+            return targetFile;
+        }
+
+        return getFetcher(resolvedUri).fetch(uri, conf, baseDir);
+    }
+
+    private boolean isRawHttp(String uriScheme) {
+        if ("http".equals(uriScheme)) {
+            if (conf.getBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED)) {
+                return true;
+            }
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Artifact fetching from raw HTTP endpoints are 
disabled. Set the '%s' property to override.",
+                            ArtifactFetchOptions.RAW_HTTP_ENABLED.key()));
+        }
+
+        return false;
+    }
+
+    /** Artifact fetch result with all fetched artifact(s). */
+    public static class Result {
+
+        private final File jobJar;
+        private final List<File> artifacts;
+
+        private Result(@Nullable File jobJar, @Nullable List<File> 
additionalJars) {
+            this.jobJar = jobJar;
+            this.artifacts = additionalJars == null ? Collections.emptyList() 
: additionalJars;
+        }
+
+        @Nullable
+        public File getJobJar() {
+            return jobJar;
+        }
+
+        @Nullable
+        public List<File> getArtifacts() {
+            return artifacts.isEmpty() ? null : 
Collections.unmodifiableList(artifacts);
+        }
+    }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java
index 8d43998079b..0740a2e6b4b 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java
@@ -22,8 +22,8 @@ import org.apache.flink.configuration.Configuration;
 
 import java.io.File;
 
-/** The artifact fetcher. */
-public interface ArtifactFetcher {
+/** Abstract artifact fetcher. */
+abstract class ArtifactFetcher {
 
     /**
      * Fetch the resource from the uri to the targetDir.
@@ -34,5 +34,6 @@ public interface ArtifactFetcher {
      * @return The path of the fetched artifact.
      * @throws Exception Error during fetching the artifact.
      */
-    File fetch(String uri, Configuration flinkConfiguration, File targetDir) 
throws Exception;
+    abstract File fetch(String uri, Configuration flinkConfiguration, File 
targetDir)
+            throws Exception;
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactUtils.java
index aa1d6803a50..11249b65d98 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactUtils.java
@@ -17,55 +17,43 @@
 
 package org.apache.flink.client.program.artifact;
 
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.FilenameUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.net.URI;
 
-/** Manage the user artifacts. */
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Artifact fetch related utils. */
 public class ArtifactUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactUtils.class);
 
-    private static synchronized void createIfNotExists(File targetDir) {
-        if (!targetDir.exists()) {
+    /**
+     * Creates missing parent directories for the given {@link File} if there 
are any. Does nothing
+     * otherwise.
+     *
+     * @param baseDir base dir to create parents for
+     */
+    public static synchronized void createMissingParents(File baseDir) {
+        checkNotNull(baseDir, "Base dir has to be provided.");
+
+        if (!baseDir.exists()) {
             try {
-                FileUtils.forceMkdirParent(targetDir);
-                LOG.info("Created dir: {}", targetDir);
+                FileUtils.forceMkdirParent(baseDir);
+                LOG.info("Created parents for base dir: {}", baseDir);
             } catch (Exception e) {
                 throw new FlinkRuntimeException(
-                        String.format("Failed to create the dir: %s", 
targetDir), e);
+                        String.format("Failed to create parent(s) for given 
base dir: %s", baseDir),
+                        e);
             }
         }
     }
 
-    public static File fetch(String jarURI, Configuration flinkConfiguration, 
String targetDirStr)
-            throws Exception {
-        URI uri = PackagedProgramUtils.resolveURI(jarURI);
-        if ("local".equals(uri.getScheme()) && uri.isAbsolute()) {
-            return new File(uri.getPath());
-        } else {
-            File targetDir = new File(targetDirStr);
-            File targetFile = new File(targetDir, 
FilenameUtils.getName(uri.getPath()));
-            // user artifacts will be kept if enable emptyDir
-            if (!targetFile.exists()) {
-                createIfNotExists(targetDir);
-                if ("http".equals(uri.getScheme()) || 
"https".equals(uri.getScheme())) {
-                    return HttpArtifactFetcher.INSTANCE.fetch(
-                            jarURI, flinkConfiguration, targetDir);
-                } else {
-                    return FileSystemBasedArtifactFetcher.INSTANCE.fetch(
-                            jarURI, flinkConfiguration, targetDir);
-                }
-            }
-            return targetFile;
-        }
+    private ArtifactUtils() {
+        throw new UnsupportedOperationException("This class should never be 
instantiated.");
     }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/FileSystemBasedArtifactFetcher.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/FsArtifactFetcher.java
similarity index 75%
rename from 
flink-clients/src/main/java/org/apache/flink/client/program/artifact/FileSystemBasedArtifactFetcher.java
rename to 
flink-clients/src/main/java/org/apache/flink/client/program/artifact/FsArtifactFetcher.java
index ebf2bd330a4..caa40858fe2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/FileSystemBasedArtifactFetcher.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/FsArtifactFetcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program.artifact;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -27,17 +28,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 
-/** Leverage the flink filesystem plugin to fetch the artifact. */
-public class FileSystemBasedArtifactFetcher implements ArtifactFetcher {
+/** Copies artifact via the Flink filesystem plugin. */
+class FsArtifactFetcher extends ArtifactFetcher {
 
-    public static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBasedArtifactFetcher.class);
-    public static final FileSystemBasedArtifactFetcher INSTANCE =
-            new FileSystemBasedArtifactFetcher();
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsArtifactFetcher.class);
 
     @Override
-    public File fetch(String uri, Configuration flinkConfiguration, File 
targetDir)
-            throws Exception {
-        org.apache.flink.core.fs.Path source = new 
org.apache.flink.core.fs.Path(uri);
+    File fetch(String uri, Configuration flinkConf, File targetDir) throws 
Exception {
+        Path source = new Path(uri);
         long start = System.currentTimeMillis();
         FileSystem fileSystem = source.getFileSystem();
         String fileName = source.getName();
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/HttpArtifactFetcher.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/HttpArtifactFetcher.java
index 88ef921e863..70303f85f77 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/HttpArtifactFetcher.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/HttpArtifactFetcher.java
@@ -26,25 +26,23 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Map;
 
-/** Download the jar from the http resource. */
-public class HttpArtifactFetcher implements ArtifactFetcher {
+/** Downloads artifact from an HTTP resource. */
+class HttpArtifactFetcher extends ArtifactFetcher {
 
     public static final Logger LOG = 
LoggerFactory.getLogger(HttpArtifactFetcher.class);
-    public static final HttpArtifactFetcher INSTANCE = new 
HttpArtifactFetcher();
 
     @Override
-    public File fetch(String uri, Configuration flinkConfiguration, File 
targetDir)
-            throws Exception {
+    File fetch(String uri, Configuration flinkConf, File targetDir) throws 
IOException {
         long start = System.currentTimeMillis();
         URL url = new URL(uri);
         HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-        Map<String, String> headers =
-                
flinkConfiguration.get(ArtifactFetchOptions.USER_ARTIFACT_HTTP_HEADER);
+        Map<String, String> headers = 
flinkConf.get(ArtifactFetchOptions.HTTP_HEADERS);
 
         if (headers != null) {
             headers.forEach(conn::setRequestProperty);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/LocalArtifactFetcher.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/LocalArtifactFetcher.java
new file mode 100644
index 00000000000..41c13c64bb7
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/LocalArtifactFetcher.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.artifact;
+
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.configuration.Configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+
+/** Retrieves a local artifact as a valid {@link File}. */
+class LocalArtifactFetcher extends ArtifactFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FsArtifactFetcher.class);
+
+    @Override
+    File fetch(String uri, Configuration flinkConf, File targetDir) throws 
Exception {
+        URI resolvedUri = PackagedProgramUtils.resolveURI(uri);
+        File targetFile = new File(resolvedUri.getPath());
+        LOG.debug("Retrieved local file from {} as {}", uri, targetFile);
+
+        return targetFile;
+    }
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
index f8cb80e32b4..431bba61613 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java
@@ -45,6 +45,7 @@ import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -70,6 +71,10 @@ class DefaultPackagedProgramRetrieverITCase {
     ClasspathProviderExtension testJobEntryClassClasspathProvider =
             ClasspathProviderExtension.createWithTestJobOnly();
 
+    @RegisterExtension
+    ClasspathProviderExtension additionalArtifactClasspathProvider =
+            ClasspathProviderExtension.createWithAdditionalArtifact();
+
     @Test
     void testDeriveEntryClassInformationForCustomJar()
             throws FlinkException, MalformedURLException {
@@ -319,18 +324,33 @@ class DefaultPackagedProgramRetrieverITCase {
 
     @Test
     void testWithoutJobClassAndMultipleEntryClassesOnUserClasspath() {
+        // without a job class name specified deriving the entry class from 
classpath is impossible
+        // if the classpath contains multiple classes with main methods
+        final String errorMsg = "Multiple JAR archives with entry classes 
found on classpath.";
         assertThatThrownBy(
-                        () -> {
-                            // without a job class name specified deriving the 
entry class from
-                            // classpath is impossible
-                            // if the classpath contains multiple classes with 
main methods
-                            DefaultPackagedProgramRetriever.create(
-                                    
multipleEntryClassesClasspathProvider.getDirectory(),
-                                    null,
-                                    new String[0],
-                                    new Configuration());
-                        })
-                .isInstanceOf(FlinkException.class);
+                        () ->
+                                DefaultPackagedProgramRetriever.create(
+                                        
multipleEntryClassesClasspathProvider.getDirectory(),
+                                        null,
+                                        new String[0],
+                                        new Configuration()))
+                .isInstanceOf(FlinkException.class)
+                .hasMessageContaining(errorMsg);
+
+        assertThatThrownBy(
+                        () ->
+                                DefaultPackagedProgramRetriever.create(
+                                        null,
+                                        null,
+                                        Arrays.asList(
+                                                
multipleEntryClassesClasspathProvider
+                                                        .getDirectory()
+                                                        .listFiles()),
+                                        null,
+                                        new String[0],
+                                        new Configuration()))
+                .isInstanceOf(FlinkException.class)
+                .hasMessageContaining(errorMsg);
     }
 
     @Test
@@ -475,6 +495,89 @@ class DefaultPackagedProgramRetrieverITCase {
         assertThat(actualClasspath).isEqualTo(expectedClasspath);
     }
 
+    @Test
+    void testRetrieveFromJarFileWithArtifacts()
+            throws IOException, FlinkException, ProgramInvocationException {
+        final PackagedProgramRetriever retrieverUnderTest =
+                DefaultPackagedProgramRetriever.create(
+                        null,
+                        // the testJob jar is not on the user classpath
+                        testJobEntryClassClasspathProvider.getJobJar(),
+                        Arrays.asList(
+                                
additionalArtifactClasspathProvider.getDirectory().listFiles()),
+                        null,
+                        
ClasspathProviderExtension.parametersForTestJob("suffix"),
+                        new Configuration());
+        final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
+
+        assertThat(jobGraph.getUserJars())
+                .contains(
+                        new org.apache.flink.core.fs.Path(
+                                
testJobEntryClassClasspathProvider.getJobJar().toURI()));
+        final List<String> actualClasspath =
+                
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+        final List<String> expectedClasspath =
+                extractRelativizedURLsForJarsFromDirectory(
+                        additionalArtifactClasspathProvider.getDirectory());
+
+        assertThat(actualClasspath).isEqualTo(expectedClasspath);
+    }
+
+    @Test
+    void testRetrieveFromJarFileWithUserAndArtifactLib()
+            throws IOException, FlinkException, ProgramInvocationException {
+        final PackagedProgramRetriever retrieverUnderTest =
+                DefaultPackagedProgramRetriever.create(
+                        singleEntryClassClasspathProvider.getDirectory(),
+                        // the testJob jar is not on the user classpath
+                        testJobEntryClassClasspathProvider.getJobJar(),
+                        Arrays.asList(
+                                
additionalArtifactClasspathProvider.getDirectory().listFiles()),
+                        null,
+                        
ClasspathProviderExtension.parametersForTestJob("suffix"),
+                        new Configuration());
+        final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
+
+        assertThat(jobGraph.getUserJars())
+                .contains(
+                        new org.apache.flink.core.fs.Path(
+                                
testJobEntryClassClasspathProvider.getJobJar().toURI()));
+        final List<String> actualClasspath =
+                
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+        final List<String> expectedClasspath = new ArrayList<>();
+        expectedClasspath.addAll(
+                extractRelativizedURLsForJarsFromDirectory(
+                        singleEntryClassClasspathProvider.getDirectory()));
+        expectedClasspath.addAll(
+                extractRelativizedURLsForJarsFromDirectory(
+                        additionalArtifactClasspathProvider.getDirectory()));
+
+        assertThat(actualClasspath).isEqualTo(expectedClasspath);
+    }
+
+    @Test
+    void testRetrieveFromArtifactLibWithoutJarFile()
+            throws IOException, FlinkException, ProgramInvocationException {
+        final PackagedProgramRetriever retrieverUnderTest =
+                DefaultPackagedProgramRetriever.create(
+                        null,
+                        null,
+                        Arrays.asList(
+                                
multipleEntryClassesClasspathProvider.getDirectory().listFiles()),
+                        
multipleEntryClassesClasspathProvider.getJobClassName(),
+                        
ClasspathProviderExtension.parametersForTestJob("suffix"),
+                        new Configuration());
+        final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new 
Configuration());
+
+        final List<String> actualClasspath =
+                
jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList());
+        final List<String> expectedClasspath =
+                extractRelativizedURLsForJarsFromDirectory(
+                        multipleEntryClassesClasspathProvider.getDirectory());
+
+        assertThat(actualClasspath).isEqualTo(expectedClasspath);
+    }
+
     @Test
     void testChildFirstDefaultConfiguration() throws FlinkException {
         // this is a sanity check to backup testConfigurationIsConsidered
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java
new file mode 100644
index 00000000000..f41c8237a31
--- /dev/null
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.artifact;
+
+import org.apache.flink.client.cli.ArtifactFetchOptions;
+import org.apache.flink.configuration.Configuration;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.BindException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ArtifactFetchManager}. */
+class ArtifactFetchManagerTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactFetchManagerTest.class);
+
+    @TempDir private Path tempDir;
+
+    private Configuration configuration;
+
+    @BeforeEach
+    void setup() {
+        configuration = new Configuration();
+        configuration.setString(ArtifactFetchOptions.BASE_DIR, 
tempDir.toAbsolutePath().toString());
+    }
+
+    @Test
+    void testGetFetcher() throws Exception {
+        configuration.setBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
+        ArtifactFetchManager fetchManager = new 
ArtifactFetchManager(configuration);
+
+        ArtifactFetcher fetcher = fetchManager.getFetcher(new 
URI("local:///a.jar"));
+        assertThat(fetcher).isInstanceOf(LocalArtifactFetcher.class);
+
+        fetcher = fetchManager.getFetcher(new URI("http://0.0.0.0:123/a.jar";));
+        assertThat(fetcher).isInstanceOf(HttpArtifactFetcher.class);
+
+        fetcher = fetchManager.getFetcher(new 
URI("https://0.0.0.0:123/a.jar";));
+        assertThat(fetcher).isInstanceOf(HttpArtifactFetcher.class);
+
+        fetcher = fetchManager.getFetcher(new URI("hdfs:///tmp/a.jar"));
+        assertThat(fetcher).isInstanceOf(FsArtifactFetcher.class);
+
+        fetcher = fetchManager.getFetcher(new URI("s3a:///tmp/a.jar"));
+        assertThat(fetcher).isInstanceOf(FsArtifactFetcher.class);
+    }
+
+    @Test
+    void testFileSystemFetchWithoutAdditionalUri() throws Exception {
+        File sourceFile = getDummyArtifact(getClass());
+        String uriStr = "file://" + sourceFile.toURI().getPath();
+
+        ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration);
+        ArtifactFetchManager.Result res = fetchMgr.fetchArtifacts(uriStr, 
null);
+        assertThat(res.getJobJar()).exists();
+        assertThat(res.getJobJar().getName()).isEqualTo(sourceFile.getName());
+        assertThat(res.getArtifacts()).isNull();
+    }
+
+    @Test
+    void testFileSystemFetchWithAdditionalUri() throws Exception {
+        File sourceFile = getDummyArtifact(getClass());
+        String uriStr = "file://" + sourceFile.toURI().getPath();
+        File additionalSrcFile = getFlinkClientsJar();
+        String additionalUriStr = "file://" + 
additionalSrcFile.toURI().getPath();
+
+        ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration);
+        ArtifactFetchManager.Result res =
+                fetchMgr.fetchArtifacts(uriStr, 
Collections.singletonList(additionalUriStr));
+        assertThat(res.getJobJar()).exists();
+        assertThat(res.getJobJar().getName()).isEqualTo(sourceFile.getName());
+        assertThat(res.getArtifacts()).hasSize(1);
+        File additionalFetched = res.getArtifacts().get(0);
+        
assertThat(additionalFetched.getName()).isEqualTo(additionalSrcFile.getName());
+    }
+
+    @Test
+    void testHttpFetch() throws Exception {
+        configuration.setBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED, true);
+        HttpServer httpServer = null;
+        try {
+            httpServer = startHttpServer();
+            File sourceFile = getFlinkClientsJar();
+            httpServer.createContext(
+                    "/download/" + sourceFile.getName(), new 
DummyHttpDownloadHandler(sourceFile));
+            String uriStr =
+                    String.format(
+                            "http://127.0.0.1:%d/download/"; + 
sourceFile.getName(),
+                            httpServer.getAddress().getPort());
+
+            ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration);
+            ArtifactFetchManager.Result res = fetchMgr.fetchArtifacts(new 
String[] {uriStr});
+            assertThat(res.getJobJar()).isNotNull();
+            assertThat(res.getArtifacts()).isNull();
+            assertFetchedFile(res.getJobJar(), sourceFile);
+        } finally {
+            if (httpServer != null) {
+                httpServer.stop(0);
+            }
+        }
+    }
+
+    @Test
+    void testMixedArtifactFetch() throws Exception {
+        File sourceFile = getDummyArtifact(getClass());
+        String uriStr = "file://" + sourceFile.toURI().getPath();
+        File sourceFile2 = getFlinkClientsJar();
+        String uriStr2 = "file://" + sourceFile2.toURI().getPath();
+
+        ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration);
+        ArtifactFetchManager.Result res = fetchMgr.fetchArtifacts(new String[] 
{uriStr, uriStr2});
+        assertThat(res.getJobJar()).isNull();
+        assertThat(res.getArtifacts()).hasSize(2);
+        assertFetchedFile(res.getArtifacts().get(0), sourceFile);
+        assertFetchedFile(res.getArtifacts().get(1), sourceFile2);
+    }
+
+    @Test
+    void testNoFetchOverride() throws Exception {
+        DummyFetcher dummyFetcher = new DummyFetcher();
+        ArtifactFetchManager fetchMgr =
+                new ArtifactFetchManager(
+                        dummyFetcher, dummyFetcher, dummyFetcher, 
configuration, null);
+
+        File sourceFile = getDummyArtifact(getClass());
+        Path destFile = tempDir.resolve(sourceFile.getName());
+        Files.copy(sourceFile.toPath(), destFile);
+
+        String uriStr = "file://" + sourceFile.toURI().getPath();
+        fetchMgr.fetchArtifacts(uriStr, null);
+
+        assertThat(dummyFetcher.fetchCount).isZero();
+    }
+
+    @Test
+    void testHttpDisabledError() {
+        ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration);
+        assertThatThrownBy(
+                        () ->
+                                fetchMgr.fetchArtifacts(
+                                        
"http://127.0.0.1:1234/download/notexists.jar";, null))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("raw HTTP endpoints are disabled");
+    }
+
+    @Test
+    void testMissingRequiredFetchArgs() {
+        ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration);
+        assertThatThrownBy(() -> fetchMgr.fetchArtifacts(null, null))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("The jobUri is required.");
+
+        assertThatThrownBy(() -> fetchMgr.fetchArtifacts(null))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("At least one URI is required.");
+
+        assertThatThrownBy(() -> fetchMgr.fetchArtifacts(new String[0]))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage("At least one URI is required.");
+    }
+
+    private void assertFetchedFile(File actual, File expected) {
+        assertThat(actual).exists();
+        assertThat(actual.getName()).isEqualTo(expected.getName());
+        assertThat(actual.length()).isEqualTo(expected.length());
+    }
+
+    private HttpServer startHttpServer() throws IOException {
+        int port = RandomUtils.nextInt(2000, 3000);
+        HttpServer httpServer = null;
+        while (httpServer == null && port <= 65536) {
+            try {
+                httpServer = HttpServer.create(new InetSocketAddress(port), 0);
+                httpServer.setExecutor(null);
+                httpServer.start();
+            } catch (BindException e) {
+                LOG.warn("Failed to start http server", e);
+                port++;
+            }
+        }
+        return httpServer;
+    }
+
+    private File getDummyArtifact(Class<?> cls) {
+        String className = String.format("%s.class", cls.getSimpleName());
+        URL url = cls.getResource(className);
+        assertThat(url).isNotNull();
+
+        return new File(url.getPath());
+    }
+
+    private File getFlinkClientsJar() throws IOException {
+        String pathStr =
+                ArtifactFetchManager.class
+                        .getProtectionDomain()
+                        .getCodeSource()
+                        .getLocation()
+                        .getPath();
+        Path mvnTargetDir = Paths.get(pathStr).getParent();
+
+        Collection<Path> jarPaths =
+                org.apache.flink.util.FileUtils.listFilesInDirectory(
+                        mvnTargetDir,
+                        p ->
+                                org.apache.flink.util.FileUtils.isJarFile(p)
+                                        && 
p.toFile().getName().startsWith("flink-clients")
+                                        && 
!p.toFile().getName().contains("test-utils"));
+
+        assertThat(jarPaths).isNotEmpty();
+
+        return jarPaths.iterator().next().toFile();
+    }
+
+    private static class DummyHttpDownloadHandler implements HttpHandler {
+
+        final File file;
+
+        DummyHttpDownloadHandler(File fileToDownload) {
+            checkArgument(fileToDownload.exists(), "The file to be download 
not exists!");
+            this.file = fileToDownload;
+        }
+
+        @Override
+        public void handle(HttpExchange exchange) throws IOException {
+            exchange.getResponseHeaders().add("Content-Type", 
"application/octet-stream");
+            exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 
file.length());
+            FileUtils.copyFile(this.file, exchange.getResponseBody());
+            exchange.close();
+        }
+    }
+
+    private static class DummyFetcher extends ArtifactFetcher {
+
+        int fetchCount = 0;
+
+        @Override
+        File fetch(String uri, Configuration flinkConfiguration, File 
targetDir) {
+            ++fetchCount;
+            return null;
+        }
+    }
+}
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactUtilsTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactUtilsTest.java
index 3c19636e385..7441d804700 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactUtilsTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactUtilsTest.java
@@ -17,130 +17,23 @@
 
 package org.apache.flink.client.program.artifact;
 
-import org.apache.flink.client.cli.ArtifactFetchOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Preconditions;
-
-import com.sun.net.httpserver.HttpExchange;
-import com.sun.net.httpserver.HttpHandler;
-import com.sun.net.httpserver.HttpServer;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomUtils;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
-import java.net.BindException;
-import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
-import java.net.URL;
 import java.nio.file.Path;
-import java.util.HashMap;
 
-/** Test for {@link ArtifactUtils}. */
-public class ArtifactUtilsTest {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactUtilsTest.class);
-    private Configuration configuration;
-    @TempDir Path tempDir;
+import static org.assertj.core.api.Assertions.assertThat;
 
-    @BeforeEach
-    public void setup() {
-        configuration = new Configuration();
-        configuration.setString(
-                ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, 
tempDir.toAbsolutePath().toString());
-    }
+/** Tests for {@link ArtifactUtils}. */
+class ArtifactUtilsTest {
 
     @Test
-    public void testFilesystemFetch() throws Exception {
-        File sourceFile = mockTheJarFile();
-        File file =
-                ArtifactUtils.fetch(
-                        String.format("file://%s", 
sourceFile.toURI().getPath()),
-                        configuration,
-                        tempDir.toString());
-        Assertions.assertTrue(file.exists());
-        Assertions.assertEquals(tempDir.toString(), 
file.getParentFile().toString());
-    }
-
-    @Test
-    public void testHttpFetch() throws Exception {
-        HttpServer httpServer = null;
-        try {
-            httpServer = startHttpServer();
-            File sourceFile = mockTheJarFile();
-            httpServer.createContext(
-                    "/download/major.jar", new 
DownloadFileHttpHandler(sourceFile));
-
-            File file =
-                    ArtifactUtils.fetch(
-                            String.format(
-                                    "http://127.0.0.1:%d/download/major.jar";,
-                                    httpServer.getAddress().getPort()),
-                            new Configuration()
-                                    .set(
-                                            
ArtifactFetchOptions.USER_ARTIFACT_HTTP_HEADER,
-                                            new HashMap<String, String>() {
-                                                {
-                                                    put("k1", "v1");
-                                                }
-                                            }),
-                            tempDir.toString());
-            Assertions.assertTrue(file.exists());
-            Assertions.assertEquals(tempDir.toString(), file.getParent());
-            Assertions.assertEquals("major.jar", file.getName());
-        } finally {
-            if (httpServer != null) {
-                httpServer.stop(0);
-            }
-        }
-    }
-
-    private HttpServer startHttpServer() throws IOException {
-        int port = RandomUtils.nextInt(2000, 3000);
-        HttpServer httpServer = null;
-        while (httpServer == null && port <= 65536) {
-            try {
-                httpServer = HttpServer.create(new InetSocketAddress(port), 0);
-                httpServer.setExecutor(null);
-                httpServer.start();
-            } catch (BindException e) {
-                LOG.warn("Failed to start http server", e);
-                port++;
-            }
-        }
-        return httpServer;
-    }
-
-    private File mockTheJarFile() {
-        String className = String.format("%s.class", 
ArtifactUtilsTest.class.getSimpleName());
-        URL url = ArtifactUtilsTest.class.getResource(className);
-        Assertions.assertNotNull(url);
-        return new File(url.getPath());
-    }
-
-    /** Handler to mock download file. */
-    public static class DownloadFileHttpHandler implements HttpHandler {
-
-        private final File file;
-        private final String contentType = "application/octet-stream";
-
-        public DownloadFileHttpHandler(File fileToDownload) {
-            Preconditions.checkArgument(
-                    fileToDownload.exists(), "The file to be download not 
exists!");
-            this.file = fileToDownload;
-        }
+    void testCreateMissingParents(@TempDir Path tempDir) {
+        File targetDir = 
tempDir.resolve("p1").resolve("p2").resolve("base-dir").toFile();
+        assertThat(targetDir.getParentFile().getParentFile()).doesNotExist();
 
-        @Override
-        public void handle(HttpExchange exchange) throws IOException {
-            exchange.getResponseHeaders().add("Content-Type", contentType);
-            exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, 
file.length());
-            FileUtils.copyFile(this.file, exchange.getResponseBody());
-            exchange.close();
-        }
+        ArtifactUtils.createMissingParents(targetDir);
+        assertThat(targetDir.getParentFile()).isDirectory();
     }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java
 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java
index 46097bbf5ec..f53ae539774 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java
@@ -58,6 +58,9 @@ public class ClasspathProviderExtension implements 
BeforeEachCallback, AfterEach
     private static final Path JOB_LIB_JAR_PATH =
             Paths.get("target", "flink-clients-test-utils-job-lib-jar.jar");
 
+    private static final Path ADDITIONAL_ARTIFACT_JAR_PATH =
+            Paths.get("target", 
"flink-clients-test-utils-additional-artifact-jar.jar");
+
     protected File temporaryFolder = 
org.assertj.core.util.Files.newTemporaryFolder();
 
     private final String directoryNameSuffix;
@@ -110,6 +113,14 @@ public class ClasspathProviderExtension implements 
BeforeEachCallback, AfterEach
                 TEST_JOB_JAR_PATH.toFile());
     }
 
+    public static ClasspathProviderExtension createWithAdditionalArtifact() {
+        return new ClasspathProviderExtension(
+                "_user_dir_with_additional_artifact",
+                directory -> copyJar(ADDITIONAL_ARTIFACT_JAR_PATH, directory),
+                ADDITIONAL_ARTIFACT_JAR_PATH.toFile(),
+                null);
+    }
+
     public static String[] parametersForTestJob(String strValue) {
         return new String[] {"--arg", strValue};
     }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfiguration.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfiguration.java
index e94ab2cda64..6f3da432815 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfiguration.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfiguration.java
@@ -38,7 +38,7 @@ final class StandaloneApplicationClusterConfiguration extends 
EntrypointClusterC
 
     @Nullable private final String jobClassName;
 
-    @Nullable private final String jarFile;
+    @Nullable private final String[] jars;
 
     StandaloneApplicationClusterConfiguration(
             @Nonnull String configDir,
@@ -49,13 +49,17 @@ final class StandaloneApplicationClusterConfiguration 
extends EntrypointClusterC
             @Nonnull SavepointRestoreSettings savepointRestoreSettings,
             @Nullable JobID jobId,
             @Nullable String jobClassName,
-            @Nullable String jarFile) {
+            @Nullable String[] jars) {
         super(configDir, dynamicProperties, args, hostname, restPort);
         this.savepointRestoreSettings =
                 requireNonNull(savepointRestoreSettings, 
"savepointRestoreSettings");
         this.jobId = jobId;
         this.jobClassName = jobClassName;
-        this.jarFile = jarFile;
+        this.jars = jars;
+    }
+
+    boolean hasJars() {
+        return jars != null && jars.length > 0;
     }
 
     @Nonnull
@@ -74,7 +78,7 @@ final class StandaloneApplicationClusterConfiguration extends 
EntrypointClusterC
     }
 
     @Nullable
-    String getJarFile() {
-        return jarFile;
+    String[] getJars() {
+        return jars;
     }
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java
index f00c3eddc93..112a65dd18f 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java
@@ -63,20 +63,21 @@ public class 
StandaloneApplicationClusterConfigurationParserFactory
                     .desc("Job ID of the job to run.")
                     .build();
 
-    private static final Option JOB_JAR_FILE =
-            Option.builder("jarfile")
-                    .longOpt("jar-file")
+    private static final Option JARS_OPTION =
+            Option.builder("jars")
+                    .longOpt("jars")
                     .required(false)
-                    .hasArg(true)
-                    .argName("job jar file")
-                    .desc("Jar File of the job to run.")
+                    .hasArgs()
+                    .valueSeparator(',')
+                    .argName("jar file(s) for job")
+                    .desc("Jar file of the job to run.")
                     .build();
 
     @Override
     public Options getOptions() {
         final Options options = new Options();
         options.addOption(CONFIG_DIR_OPTION);
-        options.addOption(JOB_JAR_FILE);
+        options.addOption(JARS_OPTION);
         options.addOption(REST_PORT_OPTION);
         options.addOption(JOB_CLASS_NAME_OPTION);
         options.addOption(JOB_ID_OPTION);
@@ -100,7 +101,7 @@ public class 
StandaloneApplicationClusterConfigurationParserFactory
                 CliFrontendParser.createSavepointRestoreSettings(commandLine);
         final JobID jobId = getJobId(commandLine);
         final String jobClassName = 
commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
-        final String jarFile = 
commandLine.getOptionValue(JOB_JAR_FILE.getOpt());
+        final String[] jarFiles = 
commandLine.getOptionValues(JARS_OPTION.getOpt());
 
         return new StandaloneApplicationClusterConfiguration(
                 configDir,
@@ -111,7 +112,7 @@ public class 
StandaloneApplicationClusterConfigurationParserFactory
                 savepointRestoreSettings,
                 jobId,
                 jobClassName,
-                jarFile);
+                jarFiles);
     }
 
     private int getRestPort(CommandLine commandLine) throws 
FlinkParseException {
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
index 3b525394b1d..ca77c16768b 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java
@@ -21,14 +21,12 @@ package org.apache.flink.container.entrypoint;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.cli.ArtifactFetchOptions;
 import 
org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint;
 import org.apache.flink.client.program.DefaultPackagedProgramRetriever;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramRetriever;
-import org.apache.flink.client.program.artifact.ArtifactUtils;
+import org.apache.flink.client.program.artifact.ArtifactFetchManager;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginManager;
@@ -42,13 +40,9 @@ import 
org.apache.flink.runtime.security.contexts.SecurityContext;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.FunctionUtils;
 
 import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
+import java.util.Collection;
 
 /** An {@link ApplicationClusterEntryPoint} which is started with a job in a 
predefined location. */
 @Internal
@@ -122,7 +116,6 @@ public final class StandaloneApplicationClusterEntryPoint 
extends ApplicationClu
             StandaloneApplicationClusterConfiguration clusterConfiguration) {
         Configuration configuration = loadConfiguration(clusterConfiguration);
         setStaticJobId(clusterConfiguration, configuration);
-        setJobJarFile(clusterConfiguration, configuration);
         SavepointRestoreSettings.toConfiguration(
                 clusterConfiguration.getSavepointRestoreSettings(), 
configuration);
         return configuration;
@@ -131,16 +124,25 @@ public final class StandaloneApplicationClusterEntryPoint 
extends ApplicationClu
     private static PackagedProgram getPackagedProgram(
             final StandaloneApplicationClusterConfiguration 
clusterConfiguration,
             Configuration flinkConfiguration)
-            throws FlinkException {
+            throws Exception {
         final File userLibDir = 
ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null);
-        File jarFile =
-                clusterConfiguration.getJarFile() == null
-                        ? null
-                        : 
fetchJarFileForApplicationMode(flinkConfiguration).get(0);
+
+        File jobJar = null;
+        Collection<File> artifacts = null;
+        if (clusterConfiguration.hasJars()) {
+            ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(flinkConfiguration);
+            ArtifactFetchManager.Result res =
+                    fetchMgr.fetchArtifacts(clusterConfiguration.getJars());
+
+            jobJar = res.getJobJar();
+            artifacts = res.getArtifacts();
+        }
+
         final PackagedProgramRetriever programRetriever =
                 DefaultPackagedProgramRetriever.create(
                         userLibDir,
-                        jarFile,
+                        jobJar,
+                        artifacts,
                         clusterConfiguration.getJobClassName(),
                         clusterConfiguration.getArgs(),
                         flinkConfiguration);
@@ -155,26 +157,4 @@ public final class StandaloneApplicationClusterEntryPoint 
extends ApplicationClu
             configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
jobId.toHexString());
         }
     }
-
-    private static void setJobJarFile(
-            StandaloneApplicationClusterConfiguration clusterConfiguration,
-            Configuration configuration) {
-        final String jarFile = clusterConfiguration.getJarFile();
-        if (jarFile != null) {
-            configuration.set(PipelineOptions.JARS, 
Collections.singletonList(jarFile));
-        }
-    }
-
-    private static List<File> fetchJarFileForApplicationMode(Configuration 
configuration) {
-        String targetDir = generateJarDir(configuration);
-        return configuration.get(PipelineOptions.JARS).stream()
-                .map(
-                        FunctionUtils.uncheckedFunction(
-                                uri -> ArtifactUtils.fetch(uri, configuration, 
targetDir)))
-                .collect(Collectors.toList());
-    }
-
-    public static String generateJarDir(Configuration configuration) {
-        return configuration.get(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR);
-    }
 }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java
index 37bde570fad..d7a80774cf5 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.entrypoint.FlinkParseException;
@@ -63,7 +62,10 @@ class 
StandaloneApplicationClusterConfigurationParserFactoryTest {
                             new 
StandaloneApplicationClusterConfigurationParserFactory());
     private static final String JOB_CLASS_NAME = "foobar";
 
-    private static final String JOB_JAR_FILE = 
"local:///opt/flink/artifacts/my-flink-job.jar";
+    private static final String[] JOB_JARS = {
+        "local:///opt/flink/artifacts/my-flink-job.jar",
+        "local:///opt/flink/artifacts/my-additional-dep.jar"
+    };
 
     @Test
     void testEntrypointClusterConfigurationToConfigurationParsing() throws 
FlinkParseException {
@@ -87,9 +89,9 @@ class 
StandaloneApplicationClusterConfigurationParserFactoryTest {
             String.valueOf(restPort),
             "--job-classname",
             JOB_CLASS_NAME,
+            "--jars",
+            String.join(",", JOB_JARS),
             String.format("-D%s=%s", key, value),
-            "--jar-file",
-            JOB_JAR_FILE,
             arg1,
             arg2
         };
@@ -97,6 +99,7 @@ class 
StandaloneApplicationClusterConfigurationParserFactoryTest {
         final StandaloneApplicationClusterConfiguration clusterConfiguration =
                 commandLineParser.parse(args);
         
assertThat(clusterConfiguration.getJobClassName()).isEqualTo(JOB_CLASS_NAME);
+        assertThat(clusterConfiguration.getJars()).isEqualTo(JOB_JARS);
         assertThat(clusterConfiguration.getArgs()).contains(arg1, arg2);
 
         final Configuration configuration =
@@ -110,7 +113,6 @@ class 
StandaloneApplicationClusterConfigurationParserFactoryTest {
 
         assertThat(configuration.get(RestOptions.PORT)).isEqualTo(restPort);
         
assertThat(configuration.get(DeploymentOptions.TARGET)).isEqualTo(value);
-        
assertThat(configuration.get(PipelineOptions.JARS).get(0)).isEqualTo(JOB_JAR_FILE);
     }
 
     @Test
@@ -244,7 +246,7 @@ class 
StandaloneApplicationClusterConfigurationParserFactoryTest {
         final String jobClassName = JOB_CLASS_NAME;
         final JobID jobId = new JobID();
         final String savepointRestorePath = "s3://foo/bar";
-        final String jobJarFile = JOB_JAR_FILE;
+        final String jars = String.join(",", JOB_JARS);
         final String[] args = {
             "-c",
             confDirPath,
@@ -254,8 +256,8 @@ class 
StandaloneApplicationClusterConfigurationParserFactoryTest {
             jobId.toString(),
             "-s",
             savepointRestorePath,
-            "-jarfile",
-            jobJarFile,
+            "-jars",
+            jars,
             "-n"
         };
 
@@ -265,6 +267,7 @@ class 
StandaloneApplicationClusterConfigurationParserFactoryTest {
         assertThat(clusterConfiguration.getConfigDir()).isEqualTo(confDirPath);
         
assertThat(clusterConfiguration.getJobClassName()).isEqualTo(jobClassName);
         assertThat(clusterConfiguration.getJobId()).isEqualTo(jobId);
+        assertThat(clusterConfiguration.getJars()).isEqualTo(JOB_JARS);
 
         final SavepointRestoreSettings savepointRestoreSettings =
                 clusterConfiguration.getSavepointRestoreSettings();
@@ -285,12 +288,17 @@ class 
StandaloneApplicationClusterConfigurationParserFactoryTest {
     }
 
     @Test
-    void testJarFileOption() throws FlinkParseException {
+    void testJarsOption() throws FlinkParseException {
         final String[] args = {
-            "--configDir", confDirPath, "--job-classname", "foobar", 
"--jar-file", JOB_JAR_FILE
+            "--configDir",
+            confDirPath,
+            "--job-classname",
+            "foobar",
+            "--jars",
+            String.join(",", JOB_JARS)
         };
         final StandaloneApplicationClusterConfiguration 
applicationClusterConfiguration =
                 commandLineParser.parse(args);
-        
assertThat(applicationClusterConfiguration.getJarFile()).isEqualTo(JOB_JAR_FILE);
+        
assertThat(applicationClusterConfiguration.getJars()).containsExactlyInAnyOrder(JOB_JARS);
     }
 }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPointTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPointTest.java
deleted file mode 100644
index 8efe5f329a4..00000000000
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPointTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.container.entrypoint;
-
-import org.apache.flink.client.cli.ArtifactFetchOptions;
-import org.apache.flink.configuration.Configuration;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.nio.file.Path;
-
-/** Tests for the {@link StandaloneApplicationClusterEntryPointTest}. */
-public class StandaloneApplicationClusterEntryPointTest {
-
-    private static final Logger LOG =
-            
LoggerFactory.getLogger(StandaloneApplicationClusterEntryPointTest.class);
-
-    private Configuration configuration;
-    @TempDir Path tempDir;
-
-    @BeforeEach
-    public void setup() {
-        configuration = new Configuration();
-        configuration.setString(
-                ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, 
tempDir.toAbsolutePath().toString());
-    }
-
-    @Test
-    public void testGenerateJarDir() {
-        String baseDir = 
StandaloneApplicationClusterEntryPoint.generateJarDir(configuration);
-        String expectedDir = String.join(File.separator, new String[] 
{tempDir.toString()});
-        Assertions.assertEquals(expectedDir, baseDir);
-    }
-}
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 460ad9e5288..8e680de327b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -63,7 +63,8 @@ public final class GlobalConfiguration {
                 "service-key",
                 "token",
                 "basic-auth",
-                "jaas.config"
+                "jaas.config",
+                "http-headers"
             };
 
     // the hidden content to be displayed
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
index 7a35102f2ce..27382e41be4 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java
@@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.configuration;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
-import org.apache.flink.client.cli.ArtifactFetchOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ExternalResourceOptions;
@@ -506,33 +505,6 @@ public class KubernetesConfigOptions {
                                     + "Flink. A typical use-case is when one 
uses Flink Kubernetes "
                                     + "Operator.");
 
-    public static final ConfigOption<Map<String, String>> 
KUBERNETES_USER_ARTIFACT_HTTP_HEADER =
-            ConfigOptions.key("kubernetes.user.artifacts.http.header")
-                    .mapType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Custom HTTP header for HttpArtifactFetcher. The 
header will be applied when getting the application job artifacts. "
-                                    + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
-
-    public static final ConfigOption<String> 
KUBERNETES_USER_ARTIFACTS_BASE_DIR =
-            ConfigOptions.key("kubernetes.user.artifacts.base.dir")
-                    .stringType()
-                    .defaultValue("/opt/flink/artifacts")
-                    .withDescription("The base dir to put the application job 
artifacts.");
-
-    public static final ConfigOption<Boolean> 
KUBERNETES_USER_ARTIFACTS_EMPTYDIR_ENABLE =
-            ConfigOptions.key("kubernetes.user.artifacts.emptyDir.enable")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Whether to enable create mount an 
empty dir for %s to keep user artifacts if container restart.",
-                                            code(
-                                                    
ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR
-                                                            .key()))
-                                    .build());
-
     /**
      * This will only be used to support blocklist mechanism, which is 
experimental currently, so we
      * do not want to expose this option in the documentation.
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
index 69cdb0c0a33..e3c7672122c 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.client.program.DefaultPackagedProgramRetriever;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramRetriever;
 import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.artifact.ArtifactUtils;
+import org.apache.flink.client.program.artifact.ArtifactFetchManager;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.fs.FileSystem;
@@ -41,14 +41,14 @@ import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.FunctionUtils;
 
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** An {@link ApplicationClusterEntryPoint} for Kubernetes. */
 @Internal
@@ -126,11 +126,12 @@ public final class KubernetesApplicationClusterEntrypoint 
extends ApplicationClu
         // No need to do pipelineJars validation if it is a PyFlink job.
         if (!(PackagedProgramUtils.isPython(jobClassName)
                 || PackagedProgramUtils.isPython(programArguments))) {
-            final List<File> pipelineJarFiles = 
fetchJarFileForApplicationMode(configuration);
-            Preconditions.checkArgument(pipelineJarFiles.size() == 1, "Should 
only have one jar");
+            final ArtifactFetchManager.Result fetchRes = 
fetchArtifacts(configuration);
+
             return DefaultPackagedProgramRetriever.create(
                     userLibDir,
-                    pipelineJarFiles.get(0),
+                    fetchRes.getJobJar(),
+                    fetchRes.getArtifacts(),
                     jobClassName,
                     programArguments,
                     configuration);
@@ -140,29 +141,29 @@ public final class KubernetesApplicationClusterEntrypoint 
extends ApplicationClu
                 userLibDir, jobClassName, programArguments, configuration);
     }
 
-    /**
-     * Fetch the user jar from path.
-     *
-     * @param configuration Flink Configuration
-     * @return User jar File
-     */
-    public static List<File> fetchJarFileForApplicationMode(Configuration 
configuration) {
-        String targetDir = generateJarDir(configuration);
-        return configuration.get(PipelineOptions.JARS).stream()
-                .map(
-                        FunctionUtils.uncheckedFunction(
-                                uri -> ArtifactUtils.fetch(uri, configuration, 
targetDir)))
-                .collect(Collectors.toList());
+    private static ArtifactFetchManager.Result fetchArtifacts(Configuration 
configuration) {
+        try {
+            String targetDir = generateJarDir(configuration);
+            ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration, targetDir);
+
+            List<String> uris = configuration.get(PipelineOptions.JARS);
+            checkArgument(uris.size() == 1, "Should only have one jar");
+            List<String> additionalUris =
+                    configuration
+                            .getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+                            .orElse(Collections.emptyList());
+
+            return fetchMgr.fetchArtifacts(uris.get(0), additionalUris);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 
-    public static String generateJarDir(Configuration configuration) {
+    static String generateJarDir(Configuration configuration) {
         return String.join(
                 File.separator,
-                new String[] {
-                    new 
File(configuration.get(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR))
-                            .getAbsolutePath(),
-                    configuration.get(KubernetesConfigOptions.NAMESPACE),
-                    configuration.get(KubernetesConfigOptions.CLUSTER_ID)
-                });
+                new 
File(configuration.get(ArtifactFetchOptions.BASE_DIR)).getAbsolutePath(),
+                configuration.get(KubernetesConfigOptions.NAMESPACE),
+                configuration.get(KubernetesConfigOptions.CLUSTER_ID));
     }
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
index 963cef9015c..00977ef8944 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
@@ -91,6 +91,16 @@ public class InitJobManagerDecorator extends 
AbstractKubernetesStepDecorator {
                 .withDnsPolicy(dnsPolicy)
                 .endSpec();
 
+        // Specify volume for user artifact(s)
+        basicPodBuilder
+                .editOrNewSpec()
+                .addNewVolume()
+                .withName(Constants.USER_ARTIFACTS_VOLUME)
+                .withNewEmptyDir()
+                .endEmptyDir()
+                .endVolume()
+                .endSpec();
+
         // Merge fields
         basicPodBuilder
                 .editOrNewMetadata()
@@ -107,17 +117,7 @@ public class InitJobManagerDecorator extends 
AbstractKubernetesStepDecorator {
                 .endSpec();
 
         final Container basicMainContainer = 
decorateMainContainer(flinkPod.getMainContainer());
-        if (flinkConfig.getBoolean(
-                
KubernetesConfigOptions.KUBERNETES_USER_ARTIFACTS_EMPTYDIR_ENABLE)) {
-            basicPodBuilder
-                    .editOrNewSpec()
-                    .addNewVolume()
-                    .withName(Constants.USER_ARTIFACTS_VOLUME)
-                    .withNewEmptyDir()
-                    .endEmptyDir()
-                    .endVolume()
-                    .endSpec();
-        }
+
         return new FlinkPod.Builder(flinkPod)
                 .withPod(basicPodBuilder.build())
                 .withMainContainer(basicMainContainer)
@@ -160,6 +160,13 @@ public class InitJobManagerDecorator extends 
AbstractKubernetesStepDecorator {
                 .withImagePullPolicy(imagePullPolicy)
                 .withResources(requirements);
 
+        // Mount volume for user artifact(s)
+        mainContainerBuilder
+                .addNewVolumeMount()
+                .withName(Constants.USER_ARTIFACTS_VOLUME)
+                
.withMountPath(kubernetesJobManagerParameters.getUserArtifactsBaseDir())
+                .endVolumeMount();
+
         // Merge fields
         mainContainerBuilder
                 .addAllToPorts(getContainerPorts())
@@ -171,14 +178,7 @@ public class InitJobManagerDecorator extends 
AbstractKubernetesStepDecorator {
                                 .withNewFieldRef(API_VERSION, 
POD_IP_FIELD_PATH)
                                 .build())
                 .endEnv();
-        if (flinkConfig.getBoolean(
-                
KubernetesConfigOptions.KUBERNETES_USER_ARTIFACTS_EMPTYDIR_ENABLE)) {
-            mainContainerBuilder
-                    .addNewVolumeMount()
-                    .withName(Constants.USER_ARTIFACTS_VOLUME)
-                    
.withMountPath(kubernetesJobManagerParameters.getUserArtifactsBaseDir())
-                    .endVolumeMount();
-        }
+
         getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv);
         return mainContainerBuilder.build();
     }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
index c19c61a7eef..845f930d18d 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java
@@ -207,6 +207,6 @@ public class KubernetesJobManagerParameters extends 
AbstractKubernetesParameters
     }
 
     public String getUserArtifactsBaseDir() {
-        return 
flinkConfig.getString(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR);
+        return flinkConfig.getString(ArtifactFetchOptions.BASE_DIR);
     }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java
index 1925a722b4c..6ea22c6b3bb 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java
@@ -44,8 +44,7 @@ public class KubernetesApplicationClusterEntrypointTest {
     @BeforeEach
     public void setup() {
         configuration = new Configuration();
-        configuration.setString(
-                ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, 
tempDir.toAbsolutePath().toString());
+        configuration.setString(ArtifactFetchOptions.BASE_DIR, 
tempDir.toAbsolutePath().toString());
         configuration.setString(KubernetesConfigOptions.NAMESPACE, 
TEST_NAMESPACE);
         configuration.setString(KubernetesConfigOptions.CLUSTER_ID, 
TEST_CLUSTER_ID);
     }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
index 83c17a14db0..e9b6aafadb9 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java
@@ -491,7 +491,7 @@ class KubernetesJobManagerFactoryTest extends 
KubernetesJobManagerTestBase {
 
     @Test
     public void testArtifactsEmptyDirVolume() throws IOException {
-        flinkConfig.set(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, 
"/opt/artifacts");
+        flinkConfig.set(ArtifactFetchOptions.BASE_DIR, "/opt/artifacts");
         kubernetesJobManagerSpecification =
                 
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
                         flinkPod, kubernetesJobManagerParameters);
@@ -509,26 +509,4 @@ class KubernetesJobManagerFactoryTest extends 
KubernetesJobManagerTestBase {
                                                         
kubernetesJobManagerParameters
                                                                 
.getUserArtifactsBaseDir()));
     }
-
-    @Test
-    public void testTurnOffArtifactsEmptyDirVolume() throws IOException {
-        
flinkConfig.set(KubernetesConfigOptions.KUBERNETES_USER_ARTIFACTS_EMPTYDIR_ENABLE,
 false);
-        flinkConfig.set(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, 
"/opt/artifacts");
-        kubernetesJobManagerSpecification =
-                
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
-                        flinkPod, kubernetesJobManagerParameters);
-        final PodSpec podSpec =
-                
kubernetesJobManagerSpecification.getDeployment().getSpec().getTemplate().getSpec();
-        assertThat(podSpec.getVolumes())
-                .noneMatch(resource -> 
resource.getName().equals(Constants.USER_ARTIFACTS_VOLUME));
-        final Container container = podSpec.getContainers().get(0);
-        assertThat(container.getVolumeMounts())
-                .noneMatch(
-                        resource ->
-                                
resource.getName().equals(Constants.USER_ARTIFACTS_VOLUME)
-                                        && resource.getMountPath()
-                                                .equals(
-                                                        
kubernetesJobManagerParameters
-                                                                
.getUserArtifactsBaseDir()));
-    }
 }
diff --git 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
index 9de8ac968ed..67a6fd51fbf 100644
--- 
a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
+++ 
b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java
@@ -258,7 +258,7 @@ class KubernetesJobManagerParametersTest extends 
KubernetesTestBase {
 
     @Test
     public void testGetUserArtifactsBaseDir() {
-        flinkConfig.set(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, 
"/opt/job/artifacts");
+        flinkConfig.set(ArtifactFetchOptions.BASE_DIR, "/opt/job/artifacts");
         assertThat(kubernetesJobManagerParameters.getUserArtifactsBaseDir())
                 .isEqualTo("/opt/job/artifacts");
     }
diff --git a/flink-test-utils-parent/flink-clients-test-utils/pom.xml 
b/flink-test-utils-parent/flink-clients-test-utils/pom.xml
index 33f51a7c923..608acf6e679 100644
--- a/flink-test-utils-parent/flink-clients-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-clients-test-utils/pom.xml
@@ -93,6 +93,26 @@ under the License.
                                                        
<shadedClassifierName>job-lib-jar</shadedClassifierName>
                                                </configuration>
                                        </execution>
+                                       <execution>
+                                               
<id>test-user-classloader-no-main-jar</id>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <filters>
+                                                               <filter>
+                                                                       
<artifact>*</artifact>
+                                                                       
<includes>
+                                                                               
<include>**/TestUserClassLoaderAdditionalArtifact.*</include>
+                                                                               
<include>META-INF/**</include>
+                                                                       
</includes>
+                                                               </filter>
+                                                       </filters>
+                                                       
<finalName>test-user-classloader-additional-artifact-jar</finalName>
+                                                       
<shadedArtifactAttached>true</shadedArtifactAttached>
+                                                       
<shadedClassifierName>additional-artifact-jar</shadedClassifierName>
+                                               </configuration>
+                                       </execution>
                                </executions>
                        </plugin>
                </plugins>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java
 
b/flink-test-utils-parent/flink-clients-test-utils/src/main/java/org/apache/flink/client/testjar/TestUserClassLoaderAdditionalArtifact.java
similarity index 56%
copy from 
flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java
copy to 
flink-test-utils-parent/flink-clients-test-utils/src/main/java/org/apache/flink/client/testjar/TestUserClassLoaderAdditionalArtifact.java
index 8d43998079b..35800679745 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java
+++ 
b/flink-test-utils-parent/flink-clients-test-utils/src/main/java/org/apache/flink/client/testjar/TestUserClassLoaderAdditionalArtifact.java
@@ -16,23 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.program.artifact;
+package org.apache.flink.client.testjar;
 
-import org.apache.flink.configuration.Configuration;
+/** This class is used to test classloading from additional user artifacts. */
+public class TestUserClassLoaderAdditionalArtifact {
 
-import java.io.File;
-
-/** The artifact fetcher. */
-public interface ArtifactFetcher {
-
-    /**
-     * Fetch the resource from the uri to the targetDir.
-     *
-     * @param uri The artifact to be fetched.
-     * @param flinkConfiguration Flink configuration.
-     * @param targetDir The target dir to put the artifact.
-     * @return The path of the fetched artifact.
-     * @throws Exception Error during fetching the artifact.
-     */
-    File fetch(String uri, Configuration flinkConfiguration, File targetDir) 
throws Exception;
+    int getNum() {
+        return 1;
+    }
 }

Reply via email to