This is an automated email from the ASF dual-hosted git repository. mbalassi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e63aa12252843d0098a56f3091b28d48aff5b5af Author: Ferenc Csaky <ferenc.cs...@pm.me> AuthorDate: Wed Jan 10 17:19:57 2024 +0100 [FLINK-28915][k8s] Support fetching remote job jar and additional dependencies on Kubernetes Closes #24065. --- docs/content.zh/docs/deployment/config.md | 9 +- .../resource-providers/native_kubernetes.md | 16 +- .../resource-providers/standalone/docker.md | 28 +- .../resource-providers/standalone/kubernetes.md | 6 +- docs/content/docs/deployment/config.md | 9 +- .../resource-providers/native_kubernetes.md | 15 +- .../resource-providers/standalone/docker.md | 28 +- .../resource-providers/standalone/kubernetes.md | 6 +- .../resource-providers/standalone/overview.md | 16 +- .../generated/artifact_fetch_configuration.html | 18 +- .../generated/kubernetes_config_configuration.html | 6 - flink-clients/pom.xml | 8 + .../flink/client/cli/ArtifactFetchOptions.java | 30 ++- .../program/DefaultPackagedProgramRetriever.java | 80 ++++-- .../program/artifact/ArtifactFetchManager.java | 195 ++++++++++++++ .../client/program/artifact/ArtifactFetcher.java | 7 +- .../client/program/artifact/ArtifactUtils.java | 50 ++-- ...ArtifactFetcher.java => FsArtifactFetcher.java} | 14 +- .../program/artifact/HttpArtifactFetcher.java | 12 +- .../program/artifact/LocalArtifactFetcher.java | 42 +++ .../DefaultPackagedProgramRetrieverITCase.java | 125 ++++++++- .../program/artifact/ArtifactFetchManagerTest.java | 281 +++++++++++++++++++++ .../client/program/artifact/ArtifactUtilsTest.java | 123 +-------- .../client/testjar/ClasspathProviderExtension.java | 11 + .../StandaloneApplicationClusterConfiguration.java | 14 +- ...plicationClusterConfigurationParserFactory.java | 19 +- .../StandaloneApplicationClusterEntryPoint.java | 54 ++-- ...ationClusterConfigurationParserFactoryTest.java | 30 ++- ...StandaloneApplicationClusterEntryPointTest.java | 56 ---- .../flink/configuration/GlobalConfiguration.java | 3 +- .../configuration/KubernetesConfigOptions.java | 28 -- .../KubernetesApplicationClusterEntrypoint.java | 55 ++-- .../decorators/InitJobManagerDecorator.java | 38 +-- .../parameters/KubernetesJobManagerParameters.java | 2 +- ...KubernetesApplicationClusterEntrypointTest.java | 3 +- .../factory/KubernetesJobManagerFactoryTest.java | 24 +- .../KubernetesJobManagerParametersTest.java | 2 +- .../flink-clients-test-utils/pom.xml | 20 ++ .../TestUserClassLoaderAdditionalArtifact.java | 23 +- 39 files changed, 1000 insertions(+), 506 deletions(-) diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index 7561852e4d0..35b85240d03 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -318,10 +318,13 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" > ---- ---- -# Artifact Fetch +# Artifact Fetching + +Flink can fetch user artifacts stored locally, on remote DFS, or accessible via an HTTP(S) endpoint. +{{< hint info >}} +**Note:** This is only supported in Standalone Application Mode and Native Kubernetes Application Mode. +{{< /hint >}} -*Artifact Fetch* is a features that Flink will fetch user artifact stored in DFS or download by HTTP/HTTPS. -Note that it is only supported in StandAlone Application Mode and Native Kubernetes Application Mode. {{< generated/artifact_fetch_configuration >}} ---- diff --git a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md index 048a4e800c1..4f3a537cdbc 100644 --- a/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md +++ b/docs/content.zh/docs/deployment/resource-providers/native_kubernetes.md @@ -107,26 +107,24 @@ $ ./bin/flink run-application \ # FileSystem $ ./bin/flink run-application \ --target kubernetes-application \ - -Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ s3://my-bucket/my-flink-job.jar -# Http/Https Schema +# HTTP(S) $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ - http://ip:port/my-flink-job.jar + https://ip:port/my-flink-job.jar ``` {{< hint info >}} -Now, The jar artifact supports downloading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https in Application Mode. -The jar package will be downloaded from filesystem to -[user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. +JAR fetching supports downloading from [filesystems]({{< ref "docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode. +The JAR will be downloaded to +[user.artifacts.base-dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. {{< /hint >}} -<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the jar must be provided in the image or download by a init container like [Example]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#example-of-pod-template). - +<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the JAR must be provided in the image or downloaded by an init container as described in [this example](#example-of-pod-template). The `kubernetes.cluster-id` option specifies the cluster name and must be unique. If you do not specify this option, then Flink will generate a random name. @@ -348,7 +346,7 @@ $ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edi ``` If you do not want to use the `default` service account, use the following command to create a new `flink-service-account` service account and set the role binding. -Then use the config option `-Dkubernetes.service-account=flink-service-account` to make the JobManager pod use the `flink-service-account` service account to create/delete TaskManager pods and leader ConfigMaps. +Then use the config option `-Dkubernetes.service-account=flink-service-account` to configure the JobManager pod's service account used to create and delete TaskManager pods and leader ConfigMaps. Also this will allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager. ```bash diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/docker.md b/docs/content.zh/docs/deployment/resource-providers/standalone/docker.md index ea12beb65ec..a4c6a37819f 100644 --- a/docs/content.zh/docs/deployment/resource-providers/standalone/docker.md +++ b/docs/content.zh/docs/deployment/resource-providers/standalone/docker.md @@ -121,7 +121,7 @@ The *job artifacts* are included into the class path of Flink's JVM process with * all other necessary dependencies or resources, not included into Flink. To deploy a cluster for a single job with Docker, you need to -* make *job artifacts* available locally in all containers under `/opt/flink/usrlib` or pass jar path by *jar-file* argument. +* make *job artifacts* available locally in all containers under `/opt/flink/usrlib`, or pass a list of jars via the `--jars` argument * start a JobManager container in the *Application cluster* mode * start the required number of TaskManager containers. @@ -156,7 +156,6 @@ To make the **job artifacts available** locally in the container, you can * **or extend the Flink image** by writing a custom `Dockerfile`, build it and use it for starting the JobManager and TaskManagers: - ```dockerfile FROM flink ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1 @@ -175,8 +174,7 @@ To make the **job artifacts available** locally in the container, you can $ docker run flink_with_job_artifacts taskmanager ``` -* **or pass jar path by jar-file argument** when you start the JobManager: - +* **or pass jar path by `jars` argument** when you start the JobManager: ```sh $ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" @@ -184,27 +182,31 @@ To make the **job artifacts available** locally in the container, you can $ docker run \ --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \ - --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \ + --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{< version >}}.jar \ --name=jobmanager \ --network flink-network \ flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} standalone-job \ --job-classname com.job.ClassName \ - --jar-file s3://my-bucket/my-flink-job.jar + --jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar \ [--job-id <job id>] \ [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \ [job arguments] - + ``` + The `standalone-job` argument starts a JobManager container in the Application Mode. #### JobManager additional command line arguments You can provide the following additional command line arguments to the cluster entrypoint: -* `--job-classname <job class name>`: Class name of the job to run. +* `--job-classname <job class name>` (optional): Class name of the job to run. By default, Flink scans its class path for a JAR with a Main-Class or program-class manifest entry and chooses it as the job class. Use this command line argument to manually set the job class. + + {{< hint warning >}} This argument is required in case that no or more than one JAR with such a manifest entry is available on the class path. + {{< /hint >}} * `--job-id <job id>` (optional): Manually set a Flink job ID for the job (default: 00000000000000000000000000000000) @@ -216,12 +218,12 @@ You can provide the following additional command line arguments to the cluster e * `--allowNonRestoredState` (optional): Skip broken savepoint state - Additionally you can specify this argument to allow that savepoint state is skipped which cannot be restored. + Additionally, you can specify this argument to allow that savepoint state is skipped which cannot be restored. -* `--jar-file` (optional): the path of jar artifact +* `--jars` (optional): the paths of the job jar and any additional artifact(s) separated by commas - You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https. Flink will fetch it when deploy the job. - (e.g., s3://my-bucket/my-flink-job.jar). + You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or download via HTTP(S). + Flink will fetch these during the job deployment. (e.g. `--jars s3://my-bucket/my-flink-job.jar`, `--jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar` ). If the main function of the user job main class accepts arguments, you can also pass them at the end of the `docker run` command. @@ -326,7 +328,7 @@ services: image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} ports: - "8081:8081" - command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] ["--jar-file" "/path/to/user-artifact"] [job arguments] + command: standalone-job --job-classname com.job.ClassName [--jars /path/to/artifact1,/path/to/artifact2] [--job-id <job id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] [job arguments] volumes: - /host/path/to/job/artifacts:/opt/flink/usrlib environment: diff --git a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md index c1a3069ac51..95fb16d7dec 100644 --- a/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md +++ b/docs/content.zh/docs/deployment/resource-providers/standalone/kubernetes.md @@ -120,7 +120,7 @@ $ ./bin/flink run -m localhost:8081 ./examples/streaming/TopSpeedWindowing.jar * 可以从 [资源定义示例](#application-cluster-resource-definitions) 中的 `job-artifacts-volume` 处获取。假如是在 minikube 集群中创建这些组件,那么定义示例中的 job-artifacts-volume 可以挂载为主机的本地目录。如果不使用 minikube 集群,那么可以使用 Kubernetes 集群中任何其它可用类型的 volume 来提供 *job artifacts* * 构建一个已经包含 *job artifacts* 参数的[自定义镜像]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#advanced-customization)。 -* 通过指定[--jar file]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#jobmanager-additional-command-line-arguments)参数提供 存储在DFS或者可由HTTP/HTTPS下载的*job artifacts*路径 +* 通过指定[--jars]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#jobmanager-additional-command-line-arguments)参数提供 存储在DFS或者可由HTTP/HTTPS下载的*job artifacts*路径 在创建[通用集群组件](#common-cluster-resource-definitions)后,指定 [Application 集群资源定义](#application-cluster-resource-definitions)文件,执行 `kubectl` 命令来启动 Flink Application 集群: @@ -627,7 +627,7 @@ spec: - name: jobmanager image: apache/flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} env: - args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选的参数项: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState", "--jar-file", "/path/to/user-artifact"] + args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选的参数项: ["--job-id", "<job id>", "--jars", "/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] ports: - containerPort: 6123 name: rpc @@ -686,7 +686,7 @@ spec: apiVersion: v1 fieldPath: status.podIP # 下面的 args 参数会使用 POD_IP 对应的值覆盖 config map 中 jobmanager.rpc.address 的属性值。 - args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选参数项: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState", "--jar-file", "/path/to/user-artifact"] + args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # 可选参数项: ["--job-id", "<job id>", "--jars", "/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] ports: - containerPort: 6123 name: rpc diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md index 53f03a29cfb..c6546630888 100644 --- a/docs/content/docs/deployment/config.md +++ b/docs/content/docs/deployment/config.md @@ -320,10 +320,13 @@ See the [History Server Docs]({{< ref "docs/deployment/advanced/historyserver" > ---- ---- -# Artifact Fetch +# Artifact Fetching + +Flink can fetch user artifacts stored locally, on remote DFS, or accessible via an HTTP(S) endpoint. +{{< hint info >}} +**Note:** This is only supported in Standalone Application Mode and Native Kubernetes Application Mode. +{{< /hint >}} -*Artifact Fetch* is a features that Flink will fetch user artifact stored in DFS or download by HTTP/HTTPS. -Note that it is only supported in StandAlone Application Mode and Native Kubernetes Application Mode. {{< generated/artifact_fetch_configuration >}} ---- diff --git a/docs/content/docs/deployment/resource-providers/native_kubernetes.md b/docs/content/docs/deployment/resource-providers/native_kubernetes.md index 0bc75c26f2b..8b676ab983b 100644 --- a/docs/content/docs/deployment/resource-providers/native_kubernetes.md +++ b/docs/content/docs/deployment/resource-providers/native_kubernetes.md @@ -119,19 +119,20 @@ $ ./bin/flink run-application \ -Dkubernetes.container.image=custom-image-name \ s3://my-bucket/my-flink-job.jar -# Http/Https Schema +# HTTP(S) $ ./bin/flink run-application \ --target kubernetes-application \ -Dkubernetes.cluster-id=my-first-application-cluster \ -Dkubernetes.container.image=custom-image-name \ - http://ip:port/my-flink-job.jar + https://ip:port/my-flink-job.jar ``` {{< hint info >}} -Now, The jar artifact supports downloading from the [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https in Application Mode. -The jar package will be downloaded from filesystem to -[user.artifacts.base.dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. +JAR fetching supports downloading from [filesystems]({{< ref "docs/deployment/filesystems/overview" >}}) or HTTP(S) in Application Mode. +The JAR will be downloaded to +[user.artifacts.base-dir]({{< ref "docs/deployment/config" >}}#user-artifacts-base-dir)/[kubernetes.namespace]({{< ref "docs/deployment/config" >}}#kubernetes-namespace)/[kubernetes.cluster-id]({{< ref "docs/deployment/config" >}}#kubernetes-cluster-id) path in image. {{< /hint >}} -<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the jar must be provided in the image or download by a init container like [Example]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#example-of-pod-template). + +<span class="label label-info">Note</span> `local` schema is still supported. If you use `local` schema, the JAR must be provided in the image or downloaded by an init container as described in [this example](#example-of-pod-template). The `kubernetes.cluster-id` option specifies the cluster name and must be unique. If you do not specify this option, then Flink will generate a random name. @@ -353,7 +354,7 @@ $ kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edi ``` If you do not want to use the `default` service account, use the following command to create a new `flink-service-account` service account and set the role binding. -Then use the config option `-Dkubernetes.service-account=flink-service-account` to make the JobManager pod use the `flink-service-account` service account to create/delete TaskManager pods and leader ConfigMaps. +Then use the config option `-Dkubernetes.service-account=flink-service-account` to configure the JobManager pod's service account used to create and delete TaskManager pods and leader ConfigMaps. Also this will allow the TaskManager to watch leader ConfigMaps to retrieve the address of JobManager and ResourceManager. ```bash diff --git a/docs/content/docs/deployment/resource-providers/standalone/docker.md b/docs/content/docs/deployment/resource-providers/standalone/docker.md index 3b9917089da..ba355b69ced 100644 --- a/docs/content/docs/deployment/resource-providers/standalone/docker.md +++ b/docs/content/docs/deployment/resource-providers/standalone/docker.md @@ -121,7 +121,7 @@ The *job artifacts* are included into the class path of Flink's JVM process with * all other necessary dependencies or resources, not included into Flink. To deploy a cluster for a single job with Docker, you need to -* make *job artifacts* available locally in all containers under `/opt/flink/usrlib` or pass jar path by *jar-file* argument. +* make *job artifacts* available locally in all containers under `/opt/flink/usrlib`, or pass a list of jars via the `--jars` argument * start a JobManager container in the *Application cluster* mode * start the required number of TaskManager containers. @@ -156,7 +156,6 @@ To make the **job artifacts available** locally in the container, you can * **or extend the Flink image** by writing a custom `Dockerfile`, build it and use it for starting the JobManager and TaskManagers: - ```dockerfile FROM flink ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1 @@ -175,8 +174,7 @@ To make the **job artifacts available** locally in the container, you can $ docker run flink_with_job_artifacts taskmanager ``` -* **or pass jar path by jar-file argument** when you start the JobManager: - +* **or pass jar path by `jars` argument** when you start the JobManager: ```sh $ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" @@ -184,27 +182,31 @@ To make the **job artifacts available** locally in the container, you can $ docker run \ --env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \ - --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-1.17-SNAPSHOT.jar \ + --env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{< version >}}.jar \ --name=jobmanager \ --network flink-network \ flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} standalone-job \ --job-classname com.job.ClassName \ - --jar-file s3://my-bucket/my-flink-job.jar + --jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar \ [--job-id <job id>] \ [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \ [job arguments] - + ``` + The `standalone-job` argument starts a JobManager container in the Application Mode. #### JobManager additional command line arguments You can provide the following additional command line arguments to the cluster entrypoint: -* `--job-classname <job class name>`: Class name of the job to run. +* `--job-classname <job class name>` (optional): Class name of the job to run. By default, Flink scans its class path for a JAR with a Main-Class or program-class manifest entry and chooses it as the job class. Use this command line argument to manually set the job class. + + {{< hint warning >}} This argument is required in case that no or more than one JAR with such a manifest entry is available on the class path. + {{< /hint >}} * `--job-id <job id>` (optional): Manually set a Flink job ID for the job (default: 00000000000000000000000000000000) @@ -216,12 +218,12 @@ You can provide the following additional command line arguments to the cluster e * `--allowNonRestoredState` (optional): Skip broken savepoint state - Additionally you can specify this argument to allow that savepoint state is skipped which cannot be restored. + Additionally, you can specify this argument to allow that savepoint state is skipped which cannot be restored. -* `--jar-file` (optional): the path of jar artifact +* `--jars` (optional): the paths of the job jar and any additional artifact(s) separated by commas - You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https. Flink will fetch it when deploy the job. - (e.g., s3://my-bucket/my-flink-job.jar). + You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or download via HTTP(S). + Flink will fetch these during the job deployment. (e.g. `--jars s3://my-bucket/my-flink-job.jar`, `--jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar` ). If the main function of the user job main class accepts arguments, you can also pass them at the end of the `docker run` command. @@ -326,7 +328,7 @@ services: image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} ports: - "8081:8081" - command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] ["--jar-file" "/path/to/user-artifact"] [job arguments] + command: standalone-job --job-classname com.job.ClassName [--job-id <job id>] [--jars /path/to/artifact1,/path/to/artifact2] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] [job arguments] volumes: - /host/path/to/job/artifacts:/opt/flink/usrlib environment: diff --git a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md index c2759d45fa6..f050a19e21c 100644 --- a/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md +++ b/docs/content/docs/deployment/resource-providers/standalone/kubernetes.md @@ -124,7 +124,7 @@ The *job artifacts* could be provided by these way: The definition examples mount the volume as a local directory of the host assuming that you create the components in a minikube cluster. If you do not use a minikube cluster, you can use any other type of volume, available in your Kubernetes cluster, to supply the *job artifacts*. * You can build [a custom image]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#advanced-customization) which already contains the artifacts instead. -* You can specify [--jar file]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#jobmanager-additional-command-line-arguments)arguments to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or Http/Https. +* You can pass artifacts via the [--jars]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#jobmanager-additional-command-line-arguments) option that are stored locally, on [remote DFS]({{< ref "docs/deployment/filesystems/overview" >}}), or accessible via an HTTP(S) endpoint. After creating [the common cluster components](#common-cluster-resource-definitions), use [the Application cluster specific resource definitions](#application-cluster-resource-definitions) to launch the cluster with the `kubectl` command: @@ -613,7 +613,7 @@ spec: - name: jobmanager image: apache/flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} env: - args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState", "--jar-file", "/path/to/user-artifact"] + args: ["standalone-job", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--jars", "/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] ports: - containerPort: 6123 name: rpc @@ -672,7 +672,7 @@ spec: apiVersion: v1 fieldPath: status.podIP # The following args overwrite the value of jobmanager.rpc.address configured in the configuration config map to POD_IP. - args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState", "--jar-file", "/path/to/user-artifact"] + args: ["standalone-job", "--host", "$(POD_IP)", "--job-classname", "com.job.ClassName", <optional arguments>, <job arguments>] # optional arguments: ["--job-id", "<job id>", "--jars", "/path/to/artifact1,/path/to/artifact2", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"] ports: - containerPort: 6123 name: rpc diff --git a/docs/content/docs/deployment/resource-providers/standalone/overview.md b/docs/content/docs/deployment/resource-providers/standalone/overview.md index 1c0b4b94ea9..7c37f2789e1 100644 --- a/docs/content/docs/deployment/resource-providers/standalone/overview.md +++ b/docs/content/docs/deployment/resource-providers/standalone/overview.md @@ -96,13 +96,25 @@ Then, we can launch the JobManager: $ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing ``` -The web interface is now available at [localhost:8081](http://localhost:8081). However, the application won't be able to start, because there are no TaskManagers running yet: +The web interface is now available at [localhost:8081](http://localhost:8081). + +{{< hint info >}} +Another approach would be to use the artifact fetching mechanism via the `--jars` option: + +```bash +$ ./bin/standalone-job.sh start -D user.artifacts.base-dir=/tmp/flink-artifacts --jars local:///path/to/TopSpeedWindowing.jar +``` + +Read more about this CLI option [here]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#jobmanager-additional-command-line-arguments). +{{< /hint >}} + +However, the application won't be able to start, because there are no TaskManagers running yet: ```bash $ ./bin/taskmanager.sh start ``` -Note: You can start multiple TaskManagers, if your application needs more resources. +<span class="label label-info">Note</span> You can start multiple TaskManagers, if your application needs more resources. Stopping the services is also supported via the scripts. Call them multiple times if you want to stop multiple instances, or use `stop-all`: diff --git a/docs/layouts/shortcodes/generated/artifact_fetch_configuration.html b/docs/layouts/shortcodes/generated/artifact_fetch_configuration.html index be2cdb4a324..c97d999fccb 100644 --- a/docs/layouts/shortcodes/generated/artifact_fetch_configuration.html +++ b/docs/layouts/shortcodes/generated/artifact_fetch_configuration.html @@ -9,16 +9,28 @@ </thead> <tbody> <tr> - <td><h5>user.artifacts.base.dir</h5></td> + <td><h5>user.artifacts.artifact-list</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>List<String></td> + <td>A semicolon-separated list of the additional artifacts to fetch for the job before setting up the application cluster. All given elements have to be valid URIs. Example: s3://sandbox-bucket/format.jar;http://sandbox-server:1234/udf.jar</td> + </tr> + <tr> + <td><h5>user.artifacts.base-dir</h5></td> <td style="word-wrap: break-word;">"/opt/flink/artifacts"</td> <td>String</td> <td>The base dir to put the application job artifacts.</td> </tr> <tr> - <td><h5>user.artifacts.http.header</h5></td> + <td><h5>user.artifacts.http-headers</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>Map</td> - <td>Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the application job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.</td> + <td>Custom HTTP header(s) for the HTTP artifact fetcher. The header(s) will be applied when getting the application job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2.</td> + </tr> + <tr> + <td><h5>user.artifacts.raw-http-enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Enables artifact fetching from raw HTTP endpoints.</td> </tr> </tbody> </table> diff --git a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html index a0c530045cf..7bbdfd5e404 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_config_configuration.html @@ -290,11 +290,5 @@ <td>Integer</td> <td>Defines the number of Kubernetes transactional operation retries before the client gives up. For example, <code class="highlighter-rouge">FlinkKubeClient#checkAndUpdateConfigMap</code>.</td> </tr> - <tr> - <td><h5>kubernetes.user.artifacts.emptyDir.enable</h5></td> - <td style="word-wrap: break-word;">true</td> - <td>Boolean</td> - <td>Whether to enable create mount an empty dir for <code class="highlighter-rouge">user.artifacts.base.dir</code> to keep user artifacts if container restart.</td> - </tr> </tbody> </table> diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index d80b06a1e6b..6f654a2c3f8 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -118,6 +118,14 @@ under the License. <scope>test</scope> <classifier>job-lib-jar</classifier> </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <classifier>additional-artifact-jar</classifier> + </dependency> </dependencies> <!-- More information on this: diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java index 2ea2fc4980a..2e634fcccb2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ArtifactFetchOptions.java @@ -21,22 +21,40 @@ package org.apache.flink.client.cli; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import java.util.List; import java.util.Map; +import static org.apache.flink.configuration.ConfigOptions.key; + /** Artifact Fetch options. */ public class ArtifactFetchOptions { - public static final ConfigOption<String> USER_ARTIFACTS_BASE_DIR = - ConfigOptions.key("user.artifacts.base.dir") + public static final ConfigOption<String> BASE_DIR = + ConfigOptions.key("user.artifacts.base-dir") .stringType() .defaultValue("/opt/flink/artifacts") .withDescription("The base dir to put the application job artifacts."); - public static final ConfigOption<Map<String, String>> USER_ARTIFACT_HTTP_HEADER = - ConfigOptions.key("user.artifacts.http.header") + public static final ConfigOption<List<String>> ARTIFACT_LIST = + key("user.artifacts.artifact-list") + .stringType() + .asList() + .noDefaultValue() + .withDescription( + "A semicolon-separated list of the additional artifacts to fetch for the job before setting up the application cluster." + + " All given elements have to be valid URIs. Example: s3://sandbox-bucket/format.jar;http://sandbox-server:1234/udf.jar"); + + public static final ConfigOption<Boolean> RAW_HTTP_ENABLED = + ConfigOptions.key("user.artifacts.raw-http-enabled") + .booleanType() + .defaultValue(false) + .withDescription("Enables artifact fetching from raw HTTP endpoints."); + + public static final ConfigOption<Map<String, String>> HTTP_HEADERS = + ConfigOptions.key("user.artifacts.http-headers") .mapType() .noDefaultValue() .withDescription( - "Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the application job artifacts. " - + "Expected format: headerKey1:headerValue1,headerKey2:headerValue2."); + "Custom HTTP header(s) for the HTTP artifact fetcher. The header(s) will be applied when getting the application job artifacts." + + " Expected format: headerKey1:headerValue1,headerKey2:headerValue2."); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java index cb54d00dac3..654db0200c1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/DefaultPackagedProgramRetriever.java @@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.FunctionUtils; import javax.annotation.Nullable; @@ -36,12 +35,17 @@ import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; /** * {@code PackageProgramRetrieverImpl} is the default implementation of {@link @@ -56,17 +60,8 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever private final Configuration configuration; /** - * Creates a {@code PackageProgramRetrieverImpl} with the given parameters. - * - * @param userLibDir The user library directory that is used for generating the user classpath - * if specified. The system classpath is used if not specified. - * @param jobClassName The job class that will be used if specified. The classpath is used to - * detect any main class if not specified. - * @param programArgs The program arguments. - * @param configuration The Flink configuration for the given job. - * @return The {@code PackageProgramRetrieverImpl} that can be used to create a {@link - * PackagedProgram} instance. - * @throws FlinkException If something goes wrong during instantiation. + * See ${@link DefaultPackagedProgramRetriever#create(File, File, Collection, String, String[], + * Configuration)} . */ public static DefaultPackagedProgramRetriever create( @Nullable File userLibDir, @@ -77,6 +72,20 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever return create(userLibDir, null, jobClassName, programArgs, configuration); } + /** + * See ${@link DefaultPackagedProgramRetriever#create(File, File, Collection, String, String[], + * Configuration)} . + */ + public static DefaultPackagedProgramRetriever create( + @Nullable File userLibDir, + @Nullable File jarFile, + @Nullable String jobClassName, + String[] programArgs, + Configuration configuration) + throws FlinkException { + return create(userLibDir, jarFile, null, jobClassName, programArgs, configuration); + } + /** * Creates a {@code PackageProgramRetrieverImpl} with the given parameters. * @@ -84,6 +93,8 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever * if specified. The system classpath is used if not specified. * @param jarFile The jar archive expected to contain the job class included; {@code null} if * the job class is on the system classpath. + * @param userArtifacts The user artifacts that should be added to the user classpath if + * specified. * @param jobClassName The job class to use; if {@code null} the user classpath (or, if not set, * the system classpath) will be scanned for possible main class. * @param programArgs The program arguments. @@ -95,18 +106,23 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever public static DefaultPackagedProgramRetriever create( @Nullable File userLibDir, @Nullable File jarFile, + @Nullable Collection<File> userArtifacts, @Nullable String jobClassName, String[] programArgs, Configuration configuration) throws FlinkException { List<URL> userClasspaths; try { - final List<URL> classpathsFromUserLibDir = getClasspathsFromUserLibDir(userLibDir); + final List<URL> classpathsFromUserLibDir = + getClasspathsFromUserDir(userLibDir, jarFile); + final List<URL> classpathsFromUserArtifactDir = + getClasspathsFromArtifacts(userArtifacts, jarFile); final List<URL> classpathsFromConfiguration = getClasspathsFromConfiguration(configuration); final List<URL> classpaths = new ArrayList<>(); classpaths.addAll(classpathsFromUserLibDir); + classpaths.addAll(classpathsFromUserArtifactDir); classpaths.addAll(classpathsFromConfiguration); userClasspaths = Collections.unmodifiableList(classpaths); @@ -116,7 +132,7 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever final EntryClassInformationProvider entryClassInformationProvider = createEntryClassInformationProvider( - userLibDir == null ? null : userClasspaths, + (userLibDir == null && userArtifacts == null) ? null : userClasspaths, jarFile, jobClassName, programArgs); @@ -185,13 +201,12 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever List<URL> userClasspath, Configuration configuration) { this.entryClassInformationProvider = - Preconditions.checkNotNull( + checkNotNull( entryClassInformationProvider, "No EntryClassInformationProvider passed."); this.programArguments = - Preconditions.checkNotNull(programArguments, "No program parameter array passed."); - this.userClasspath = Preconditions.checkNotNull(userClasspath, "No user classpath passed."); - this.configuration = - Preconditions.checkNotNull(configuration, "No Flink configuration was passed."); + checkNotNull(programArguments, "No program parameter array passed."); + this.userClasspath = checkNotNull(userClasspath, "No user classpath passed."); + this.configuration = checkNotNull(configuration, "No Flink configuration was passed."); } @Override @@ -216,15 +231,34 @@ public class DefaultPackagedProgramRetriever implements PackagedProgramRetriever } } - private static List<URL> getClasspathsFromUserLibDir(@Nullable File userLibDir) - throws IOException { - if (userLibDir == null) { + private static List<URL> getClasspathsFromUserDir( + @Nullable File userDir, @Nullable File jarFile) throws IOException { + if (userDir == null) { return Collections.emptyList(); } + try (Stream<Path> files = Files.list(userDir.toPath())) { + return getClasspathsFromArtifacts(files, jarFile); + } + } + + private static List<URL> getClasspathsFromArtifacts( + @Nullable Collection<File> userArtifacts, @Nullable File jarFile) { + if (userArtifacts == null) { + return Collections.emptyList(); + } + + return getClasspathsFromArtifacts(userArtifacts.stream().map(File::toPath), jarFile); + } + + private static List<URL> getClasspathsFromArtifacts( + Stream<Path> userArtifacts, @Nullable File jarFile) { + checkNotNull(userArtifacts); + final Path workingDirectory = FileUtils.getCurrentWorkingDirectory(); final List<URL> relativeJarURLs = - FileUtils.listFilesInDirectory(userLibDir.toPath(), FileUtils::isJarFile).stream() + userArtifacts + .filter(path -> FileUtils.isJarFile(path) && !path.toFile().equals(jarFile)) .map(path -> FileUtils.relativizePath(workingDirectory, path)) .map(FunctionUtils.uncheckedFunction(FileUtils::toURL)) .collect(Collectors.toList()); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java new file mode 100644 index 00000000000..20d491433bd --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetchManager.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.artifact; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.client.cli.ArtifactFetchOptions; +import org.apache.flink.client.program.DefaultPackagedProgramRetriever; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.function.FunctionUtils; + +import org.apache.commons.io.FilenameUtils; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URI; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Class that manages the artifact loading process. */ +public class ArtifactFetchManager { + + private final ArtifactFetcher localFetcher; + private final ArtifactFetcher fsFetcher; + private final ArtifactFetcher httpFetcher; + + private final Configuration conf; + private final File baseDir; + + public ArtifactFetchManager(Configuration conf) { + this(conf, null); + } + + public ArtifactFetchManager(Configuration conf, @Nullable String baseDir) { + this( + new LocalArtifactFetcher(), + new FsArtifactFetcher(), + new HttpArtifactFetcher(), + conf, + baseDir); + } + + @VisibleForTesting + ArtifactFetchManager( + ArtifactFetcher localFetcher, + ArtifactFetcher fsFetcher, + ArtifactFetcher httpFetcher, + Configuration conf, + @Nullable String baseDir) { + this.localFetcher = checkNotNull(localFetcher); + this.fsFetcher = checkNotNull(fsFetcher); + this.httpFetcher = checkNotNull(httpFetcher); + this.conf = checkNotNull(conf); + this.baseDir = + baseDir == null + ? new File(conf.get(ArtifactFetchOptions.BASE_DIR)) + : new File(baseDir); + } + + /** + * Fetches artifacts from a given URI string array. The job jar and any additional artifacts are + * mixed, in case of multiple artifacts the {@link DefaultPackagedProgramRetriever} logic will + * be used to find the job jar. + * + * @param uris URIs to fetch + * @return result with the fetched artifacts + */ + public Result fetchArtifacts(String[] uris) { + checkArgument(uris != null && uris.length > 0, "At least one URI is required."); + + ArtifactUtils.createMissingParents(baseDir); + List<File> artifacts = + Arrays.stream(uris) + .map(FunctionUtils.uncheckedFunction(this::fetchArtifact)) + .collect(Collectors.toList()); + + if (artifacts.size() > 1) { + return new Result(null, artifacts); + } + + if (artifacts.size() == 1) { + return new Result(artifacts.get(0), null); + } + + // Should not happen. + throw new IllegalStateException("Corrupt artifact fetching state."); + } + + /** + * Fetches the job jar and any additional artifact if the given list is not null or empty. + * + * @param jobUri URI of the job jar + * @param additionalUris URI(s) of any additional artifact to fetch + * @return result with the fetched artifacts + * @throws Exception + */ + public Result fetchArtifacts(String jobUri, @Nullable List<String> additionalUris) + throws Exception { + checkArgument(jobUri != null && !jobUri.trim().isEmpty(), "The jobUri is required."); + + ArtifactUtils.createMissingParents(baseDir); + File jobJar = fetchArtifact(jobUri); + + List<File> additionalArtifacts = + additionalUris == null + ? Collections.emptyList() + : additionalUris.stream() + .map(FunctionUtils.uncheckedFunction(this::fetchArtifact)) + .collect(Collectors.toList()); + + return new Result(jobJar, additionalArtifacts); + } + + @VisibleForTesting + ArtifactFetcher getFetcher(URI uri) { + if ("local".equals(uri.getScheme())) { + return localFetcher; + } + + if (isRawHttp(uri.getScheme()) || "https".equals(uri.getScheme())) { + return httpFetcher; + } + + return fsFetcher; + } + + private File fetchArtifact(String uri) throws Exception { + URI resolvedUri = PackagedProgramUtils.resolveURI(uri); + File targetFile = new File(baseDir, FilenameUtils.getName(resolvedUri.getPath())); + if (targetFile.exists()) { + // Already fetched user artifacts are kept. + return targetFile; + } + + return getFetcher(resolvedUri).fetch(uri, conf, baseDir); + } + + private boolean isRawHttp(String uriScheme) { + if ("http".equals(uriScheme)) { + if (conf.getBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED)) { + return true; + } + throw new IllegalArgumentException( + String.format( + "Artifact fetching from raw HTTP endpoints are disabled. Set the '%s' property to override.", + ArtifactFetchOptions.RAW_HTTP_ENABLED.key())); + } + + return false; + } + + /** Artifact fetch result with all fetched artifact(s). */ + public static class Result { + + private final File jobJar; + private final List<File> artifacts; + + private Result(@Nullable File jobJar, @Nullable List<File> additionalJars) { + this.jobJar = jobJar; + this.artifacts = additionalJars == null ? Collections.emptyList() : additionalJars; + } + + @Nullable + public File getJobJar() { + return jobJar; + } + + @Nullable + public List<File> getArtifacts() { + return artifacts.isEmpty() ? null : Collections.unmodifiableList(artifacts); + } + } +} diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java index 8d43998079b..0740a2e6b4b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java @@ -22,8 +22,8 @@ import org.apache.flink.configuration.Configuration; import java.io.File; -/** The artifact fetcher. */ -public interface ArtifactFetcher { +/** Abstract artifact fetcher. */ +abstract class ArtifactFetcher { /** * Fetch the resource from the uri to the targetDir. @@ -34,5 +34,6 @@ public interface ArtifactFetcher { * @return The path of the fetched artifact. * @throws Exception Error during fetching the artifact. */ - File fetch(String uri, Configuration flinkConfiguration, File targetDir) throws Exception; + abstract File fetch(String uri, Configuration flinkConfiguration, File targetDir) + throws Exception; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactUtils.java index aa1d6803a50..11249b65d98 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactUtils.java @@ -17,55 +17,43 @@ package org.apache.flink.client.program.artifact; -import org.apache.flink.client.program.PackagedProgramUtils; -import org.apache.flink.configuration.Configuration; import org.apache.flink.util.FlinkRuntimeException; import org.apache.commons.io.FileUtils; -import org.apache.commons.io.FilenameUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.net.URI; -/** Manage the user artifacts. */ +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Artifact fetch related utils. */ public class ArtifactUtils { private static final Logger LOG = LoggerFactory.getLogger(ArtifactUtils.class); - private static synchronized void createIfNotExists(File targetDir) { - if (!targetDir.exists()) { + /** + * Creates missing parent directories for the given {@link File} if there are any. Does nothing + * otherwise. + * + * @param baseDir base dir to create parents for + */ + public static synchronized void createMissingParents(File baseDir) { + checkNotNull(baseDir, "Base dir has to be provided."); + + if (!baseDir.exists()) { try { - FileUtils.forceMkdirParent(targetDir); - LOG.info("Created dir: {}", targetDir); + FileUtils.forceMkdirParent(baseDir); + LOG.info("Created parents for base dir: {}", baseDir); } catch (Exception e) { throw new FlinkRuntimeException( - String.format("Failed to create the dir: %s", targetDir), e); + String.format("Failed to create parent(s) for given base dir: %s", baseDir), + e); } } } - public static File fetch(String jarURI, Configuration flinkConfiguration, String targetDirStr) - throws Exception { - URI uri = PackagedProgramUtils.resolveURI(jarURI); - if ("local".equals(uri.getScheme()) && uri.isAbsolute()) { - return new File(uri.getPath()); - } else { - File targetDir = new File(targetDirStr); - File targetFile = new File(targetDir, FilenameUtils.getName(uri.getPath())); - // user artifacts will be kept if enable emptyDir - if (!targetFile.exists()) { - createIfNotExists(targetDir); - if ("http".equals(uri.getScheme()) || "https".equals(uri.getScheme())) { - return HttpArtifactFetcher.INSTANCE.fetch( - jarURI, flinkConfiguration, targetDir); - } else { - return FileSystemBasedArtifactFetcher.INSTANCE.fetch( - jarURI, flinkConfiguration, targetDir); - } - } - return targetFile; - } + private ArtifactUtils() { + throw new UnsupportedOperationException("This class should never be instantiated."); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/FileSystemBasedArtifactFetcher.java b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/FsArtifactFetcher.java similarity index 75% rename from flink-clients/src/main/java/org/apache/flink/client/program/artifact/FileSystemBasedArtifactFetcher.java rename to flink-clients/src/main/java/org/apache/flink/client/program/artifact/FsArtifactFetcher.java index ebf2bd330a4..caa40858fe2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/FileSystemBasedArtifactFetcher.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/FsArtifactFetcher.java @@ -20,6 +20,7 @@ package org.apache.flink.client.program.artifact; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -27,17 +28,14 @@ import org.slf4j.LoggerFactory; import java.io.File; -/** Leverage the flink filesystem plugin to fetch the artifact. */ -public class FileSystemBasedArtifactFetcher implements ArtifactFetcher { +/** Copies artifact via the Flink filesystem plugin. */ +class FsArtifactFetcher extends ArtifactFetcher { - public static final Logger LOG = LoggerFactory.getLogger(FileSystemBasedArtifactFetcher.class); - public static final FileSystemBasedArtifactFetcher INSTANCE = - new FileSystemBasedArtifactFetcher(); + private static final Logger LOG = LoggerFactory.getLogger(FsArtifactFetcher.class); @Override - public File fetch(String uri, Configuration flinkConfiguration, File targetDir) - throws Exception { - org.apache.flink.core.fs.Path source = new org.apache.flink.core.fs.Path(uri); + File fetch(String uri, Configuration flinkConf, File targetDir) throws Exception { + Path source = new Path(uri); long start = System.currentTimeMillis(); FileSystem fileSystem = source.getFileSystem(); String fileName = source.getName(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/HttpArtifactFetcher.java b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/HttpArtifactFetcher.java index 88ef921e863..70303f85f77 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/HttpArtifactFetcher.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/HttpArtifactFetcher.java @@ -26,25 +26,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.Map; -/** Download the jar from the http resource. */ -public class HttpArtifactFetcher implements ArtifactFetcher { +/** Downloads artifact from an HTTP resource. */ +class HttpArtifactFetcher extends ArtifactFetcher { public static final Logger LOG = LoggerFactory.getLogger(HttpArtifactFetcher.class); - public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher(); @Override - public File fetch(String uri, Configuration flinkConfiguration, File targetDir) - throws Exception { + File fetch(String uri, Configuration flinkConf, File targetDir) throws IOException { long start = System.currentTimeMillis(); URL url = new URL(uri); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); - Map<String, String> headers = - flinkConfiguration.get(ArtifactFetchOptions.USER_ARTIFACT_HTTP_HEADER); + Map<String, String> headers = flinkConf.get(ArtifactFetchOptions.HTTP_HEADERS); if (headers != null) { headers.forEach(conn::setRequestProperty); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/LocalArtifactFetcher.java b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/LocalArtifactFetcher.java new file mode 100644 index 00000000000..41c13c64bb7 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/artifact/LocalArtifactFetcher.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.artifact; + +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URI; + +/** Retrieves a local artifact as a valid {@link File}. */ +class LocalArtifactFetcher extends ArtifactFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(FsArtifactFetcher.class); + + @Override + File fetch(String uri, Configuration flinkConf, File targetDir) throws Exception { + URI resolvedUri = PackagedProgramUtils.resolveURI(uri); + File targetFile = new File(resolvedUri.getPath()); + LOG.debug("Retrieved local file from {} as {}", uri, targetFile); + + return targetFile; + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java index f8cb80e32b4..431bba61613 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java @@ -45,6 +45,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -70,6 +71,10 @@ class DefaultPackagedProgramRetrieverITCase { ClasspathProviderExtension testJobEntryClassClasspathProvider = ClasspathProviderExtension.createWithTestJobOnly(); + @RegisterExtension + ClasspathProviderExtension additionalArtifactClasspathProvider = + ClasspathProviderExtension.createWithAdditionalArtifact(); + @Test void testDeriveEntryClassInformationForCustomJar() throws FlinkException, MalformedURLException { @@ -319,18 +324,33 @@ class DefaultPackagedProgramRetrieverITCase { @Test void testWithoutJobClassAndMultipleEntryClassesOnUserClasspath() { + // without a job class name specified deriving the entry class from classpath is impossible + // if the classpath contains multiple classes with main methods + final String errorMsg = "Multiple JAR archives with entry classes found on classpath."; assertThatThrownBy( - () -> { - // without a job class name specified deriving the entry class from - // classpath is impossible - // if the classpath contains multiple classes with main methods - DefaultPackagedProgramRetriever.create( - multipleEntryClassesClasspathProvider.getDirectory(), - null, - new String[0], - new Configuration()); - }) - .isInstanceOf(FlinkException.class); + () -> + DefaultPackagedProgramRetriever.create( + multipleEntryClassesClasspathProvider.getDirectory(), + null, + new String[0], + new Configuration())) + .isInstanceOf(FlinkException.class) + .hasMessageContaining(errorMsg); + + assertThatThrownBy( + () -> + DefaultPackagedProgramRetriever.create( + null, + null, + Arrays.asList( + multipleEntryClassesClasspathProvider + .getDirectory() + .listFiles()), + null, + new String[0], + new Configuration())) + .isInstanceOf(FlinkException.class) + .hasMessageContaining(errorMsg); } @Test @@ -475,6 +495,89 @@ class DefaultPackagedProgramRetrieverITCase { assertThat(actualClasspath).isEqualTo(expectedClasspath); } + @Test + void testRetrieveFromJarFileWithArtifacts() + throws IOException, FlinkException, ProgramInvocationException { + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + null, + // the testJob jar is not on the user classpath + testJobEntryClassClasspathProvider.getJobJar(), + Arrays.asList( + additionalArtifactClasspathProvider.getDirectory().listFiles()), + null, + ClasspathProviderExtension.parametersForTestJob("suffix"), + new Configuration()); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + assertThat(jobGraph.getUserJars()) + .contains( + new org.apache.flink.core.fs.Path( + testJobEntryClassClasspathProvider.getJobJar().toURI())); + final List<String> actualClasspath = + jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()); + final List<String> expectedClasspath = + extractRelativizedURLsForJarsFromDirectory( + additionalArtifactClasspathProvider.getDirectory()); + + assertThat(actualClasspath).isEqualTo(expectedClasspath); + } + + @Test + void testRetrieveFromJarFileWithUserAndArtifactLib() + throws IOException, FlinkException, ProgramInvocationException { + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + singleEntryClassClasspathProvider.getDirectory(), + // the testJob jar is not on the user classpath + testJobEntryClassClasspathProvider.getJobJar(), + Arrays.asList( + additionalArtifactClasspathProvider.getDirectory().listFiles()), + null, + ClasspathProviderExtension.parametersForTestJob("suffix"), + new Configuration()); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + assertThat(jobGraph.getUserJars()) + .contains( + new org.apache.flink.core.fs.Path( + testJobEntryClassClasspathProvider.getJobJar().toURI())); + final List<String> actualClasspath = + jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()); + final List<String> expectedClasspath = new ArrayList<>(); + expectedClasspath.addAll( + extractRelativizedURLsForJarsFromDirectory( + singleEntryClassClasspathProvider.getDirectory())); + expectedClasspath.addAll( + extractRelativizedURLsForJarsFromDirectory( + additionalArtifactClasspathProvider.getDirectory())); + + assertThat(actualClasspath).isEqualTo(expectedClasspath); + } + + @Test + void testRetrieveFromArtifactLibWithoutJarFile() + throws IOException, FlinkException, ProgramInvocationException { + final PackagedProgramRetriever retrieverUnderTest = + DefaultPackagedProgramRetriever.create( + null, + null, + Arrays.asList( + multipleEntryClassesClasspathProvider.getDirectory().listFiles()), + multipleEntryClassesClasspathProvider.getJobClassName(), + ClasspathProviderExtension.parametersForTestJob("suffix"), + new Configuration()); + final JobGraph jobGraph = retrieveJobGraph(retrieverUnderTest, new Configuration()); + + final List<String> actualClasspath = + jobGraph.getClasspaths().stream().map(URL::toString).collect(Collectors.toList()); + final List<String> expectedClasspath = + extractRelativizedURLsForJarsFromDirectory( + multipleEntryClassesClasspathProvider.getDirectory()); + + assertThat(actualClasspath).isEqualTo(expectedClasspath); + } + @Test void testChildFirstDefaultConfiguration() throws FlinkException { // this is a sanity check to backup testConfigurationIsConsidered diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java new file mode 100644 index 00000000000..f41c8237a31 --- /dev/null +++ b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactFetchManagerTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.artifact; + +import org.apache.flink.client.cli.ArtifactFetchOptions; +import org.apache.flink.configuration.Configuration; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.BindException; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Collections; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link ArtifactFetchManager}. */ +class ArtifactFetchManagerTest { + + private static final Logger LOG = LoggerFactory.getLogger(ArtifactFetchManagerTest.class); + + @TempDir private Path tempDir; + + private Configuration configuration; + + @BeforeEach + void setup() { + configuration = new Configuration(); + configuration.setString(ArtifactFetchOptions.BASE_DIR, tempDir.toAbsolutePath().toString()); + } + + @Test + void testGetFetcher() throws Exception { + configuration.setBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED, true); + ArtifactFetchManager fetchManager = new ArtifactFetchManager(configuration); + + ArtifactFetcher fetcher = fetchManager.getFetcher(new URI("local:///a.jar")); + assertThat(fetcher).isInstanceOf(LocalArtifactFetcher.class); + + fetcher = fetchManager.getFetcher(new URI("http://0.0.0.0:123/a.jar")); + assertThat(fetcher).isInstanceOf(HttpArtifactFetcher.class); + + fetcher = fetchManager.getFetcher(new URI("https://0.0.0.0:123/a.jar")); + assertThat(fetcher).isInstanceOf(HttpArtifactFetcher.class); + + fetcher = fetchManager.getFetcher(new URI("hdfs:///tmp/a.jar")); + assertThat(fetcher).isInstanceOf(FsArtifactFetcher.class); + + fetcher = fetchManager.getFetcher(new URI("s3a:///tmp/a.jar")); + assertThat(fetcher).isInstanceOf(FsArtifactFetcher.class); + } + + @Test + void testFileSystemFetchWithoutAdditionalUri() throws Exception { + File sourceFile = getDummyArtifact(getClass()); + String uriStr = "file://" + sourceFile.toURI().getPath(); + + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration); + ArtifactFetchManager.Result res = fetchMgr.fetchArtifacts(uriStr, null); + assertThat(res.getJobJar()).exists(); + assertThat(res.getJobJar().getName()).isEqualTo(sourceFile.getName()); + assertThat(res.getArtifacts()).isNull(); + } + + @Test + void testFileSystemFetchWithAdditionalUri() throws Exception { + File sourceFile = getDummyArtifact(getClass()); + String uriStr = "file://" + sourceFile.toURI().getPath(); + File additionalSrcFile = getFlinkClientsJar(); + String additionalUriStr = "file://" + additionalSrcFile.toURI().getPath(); + + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration); + ArtifactFetchManager.Result res = + fetchMgr.fetchArtifacts(uriStr, Collections.singletonList(additionalUriStr)); + assertThat(res.getJobJar()).exists(); + assertThat(res.getJobJar().getName()).isEqualTo(sourceFile.getName()); + assertThat(res.getArtifacts()).hasSize(1); + File additionalFetched = res.getArtifacts().get(0); + assertThat(additionalFetched.getName()).isEqualTo(additionalSrcFile.getName()); + } + + @Test + void testHttpFetch() throws Exception { + configuration.setBoolean(ArtifactFetchOptions.RAW_HTTP_ENABLED, true); + HttpServer httpServer = null; + try { + httpServer = startHttpServer(); + File sourceFile = getFlinkClientsJar(); + httpServer.createContext( + "/download/" + sourceFile.getName(), new DummyHttpDownloadHandler(sourceFile)); + String uriStr = + String.format( + "http://127.0.0.1:%d/download/" + sourceFile.getName(), + httpServer.getAddress().getPort()); + + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration); + ArtifactFetchManager.Result res = fetchMgr.fetchArtifacts(new String[] {uriStr}); + assertThat(res.getJobJar()).isNotNull(); + assertThat(res.getArtifacts()).isNull(); + assertFetchedFile(res.getJobJar(), sourceFile); + } finally { + if (httpServer != null) { + httpServer.stop(0); + } + } + } + + @Test + void testMixedArtifactFetch() throws Exception { + File sourceFile = getDummyArtifact(getClass()); + String uriStr = "file://" + sourceFile.toURI().getPath(); + File sourceFile2 = getFlinkClientsJar(); + String uriStr2 = "file://" + sourceFile2.toURI().getPath(); + + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration); + ArtifactFetchManager.Result res = fetchMgr.fetchArtifacts(new String[] {uriStr, uriStr2}); + assertThat(res.getJobJar()).isNull(); + assertThat(res.getArtifacts()).hasSize(2); + assertFetchedFile(res.getArtifacts().get(0), sourceFile); + assertFetchedFile(res.getArtifacts().get(1), sourceFile2); + } + + @Test + void testNoFetchOverride() throws Exception { + DummyFetcher dummyFetcher = new DummyFetcher(); + ArtifactFetchManager fetchMgr = + new ArtifactFetchManager( + dummyFetcher, dummyFetcher, dummyFetcher, configuration, null); + + File sourceFile = getDummyArtifact(getClass()); + Path destFile = tempDir.resolve(sourceFile.getName()); + Files.copy(sourceFile.toPath(), destFile); + + String uriStr = "file://" + sourceFile.toURI().getPath(); + fetchMgr.fetchArtifacts(uriStr, null); + + assertThat(dummyFetcher.fetchCount).isZero(); + } + + @Test + void testHttpDisabledError() { + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration); + assertThatThrownBy( + () -> + fetchMgr.fetchArtifacts( + "http://127.0.0.1:1234/download/notexists.jar", null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("raw HTTP endpoints are disabled"); + } + + @Test + void testMissingRequiredFetchArgs() { + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration); + assertThatThrownBy(() -> fetchMgr.fetchArtifacts(null, null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The jobUri is required."); + + assertThatThrownBy(() -> fetchMgr.fetchArtifacts(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("At least one URI is required."); + + assertThatThrownBy(() -> fetchMgr.fetchArtifacts(new String[0])) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("At least one URI is required."); + } + + private void assertFetchedFile(File actual, File expected) { + assertThat(actual).exists(); + assertThat(actual.getName()).isEqualTo(expected.getName()); + assertThat(actual.length()).isEqualTo(expected.length()); + } + + private HttpServer startHttpServer() throws IOException { + int port = RandomUtils.nextInt(2000, 3000); + HttpServer httpServer = null; + while (httpServer == null && port <= 65536) { + try { + httpServer = HttpServer.create(new InetSocketAddress(port), 0); + httpServer.setExecutor(null); + httpServer.start(); + } catch (BindException e) { + LOG.warn("Failed to start http server", e); + port++; + } + } + return httpServer; + } + + private File getDummyArtifact(Class<?> cls) { + String className = String.format("%s.class", cls.getSimpleName()); + URL url = cls.getResource(className); + assertThat(url).isNotNull(); + + return new File(url.getPath()); + } + + private File getFlinkClientsJar() throws IOException { + String pathStr = + ArtifactFetchManager.class + .getProtectionDomain() + .getCodeSource() + .getLocation() + .getPath(); + Path mvnTargetDir = Paths.get(pathStr).getParent(); + + Collection<Path> jarPaths = + org.apache.flink.util.FileUtils.listFilesInDirectory( + mvnTargetDir, + p -> + org.apache.flink.util.FileUtils.isJarFile(p) + && p.toFile().getName().startsWith("flink-clients") + && !p.toFile().getName().contains("test-utils")); + + assertThat(jarPaths).isNotEmpty(); + + return jarPaths.iterator().next().toFile(); + } + + private static class DummyHttpDownloadHandler implements HttpHandler { + + final File file; + + DummyHttpDownloadHandler(File fileToDownload) { + checkArgument(fileToDownload.exists(), "The file to be download not exists!"); + this.file = fileToDownload; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, file.length()); + FileUtils.copyFile(this.file, exchange.getResponseBody()); + exchange.close(); + } + } + + private static class DummyFetcher extends ArtifactFetcher { + + int fetchCount = 0; + + @Override + File fetch(String uri, Configuration flinkConfiguration, File targetDir) { + ++fetchCount; + return null; + } + } +} diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactUtilsTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactUtilsTest.java index 3c19636e385..7441d804700 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactUtilsTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/artifact/ArtifactUtilsTest.java @@ -17,130 +17,23 @@ package org.apache.flink.client.program.artifact; -import org.apache.flink.client.cli.ArtifactFetchOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Preconditions; - -import com.sun.net.httpserver.HttpExchange; -import com.sun.net.httpserver.HttpHandler; -import com.sun.net.httpserver.HttpServer; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.RandomUtils; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; -import java.net.BindException; -import java.net.HttpURLConnection; -import java.net.InetSocketAddress; -import java.net.URL; import java.nio.file.Path; -import java.util.HashMap; -/** Test for {@link ArtifactUtils}. */ -public class ArtifactUtilsTest { - private static final Logger LOG = LoggerFactory.getLogger(ArtifactUtilsTest.class); - private Configuration configuration; - @TempDir Path tempDir; +import static org.assertj.core.api.Assertions.assertThat; - @BeforeEach - public void setup() { - configuration = new Configuration(); - configuration.setString( - ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, tempDir.toAbsolutePath().toString()); - } +/** Tests for {@link ArtifactUtils}. */ +class ArtifactUtilsTest { @Test - public void testFilesystemFetch() throws Exception { - File sourceFile = mockTheJarFile(); - File file = - ArtifactUtils.fetch( - String.format("file://%s", sourceFile.toURI().getPath()), - configuration, - tempDir.toString()); - Assertions.assertTrue(file.exists()); - Assertions.assertEquals(tempDir.toString(), file.getParentFile().toString()); - } - - @Test - public void testHttpFetch() throws Exception { - HttpServer httpServer = null; - try { - httpServer = startHttpServer(); - File sourceFile = mockTheJarFile(); - httpServer.createContext( - "/download/major.jar", new DownloadFileHttpHandler(sourceFile)); - - File file = - ArtifactUtils.fetch( - String.format( - "http://127.0.0.1:%d/download/major.jar", - httpServer.getAddress().getPort()), - new Configuration() - .set( - ArtifactFetchOptions.USER_ARTIFACT_HTTP_HEADER, - new HashMap<String, String>() { - { - put("k1", "v1"); - } - }), - tempDir.toString()); - Assertions.assertTrue(file.exists()); - Assertions.assertEquals(tempDir.toString(), file.getParent()); - Assertions.assertEquals("major.jar", file.getName()); - } finally { - if (httpServer != null) { - httpServer.stop(0); - } - } - } - - private HttpServer startHttpServer() throws IOException { - int port = RandomUtils.nextInt(2000, 3000); - HttpServer httpServer = null; - while (httpServer == null && port <= 65536) { - try { - httpServer = HttpServer.create(new InetSocketAddress(port), 0); - httpServer.setExecutor(null); - httpServer.start(); - } catch (BindException e) { - LOG.warn("Failed to start http server", e); - port++; - } - } - return httpServer; - } - - private File mockTheJarFile() { - String className = String.format("%s.class", ArtifactUtilsTest.class.getSimpleName()); - URL url = ArtifactUtilsTest.class.getResource(className); - Assertions.assertNotNull(url); - return new File(url.getPath()); - } - - /** Handler to mock download file. */ - public static class DownloadFileHttpHandler implements HttpHandler { - - private final File file; - private final String contentType = "application/octet-stream"; - - public DownloadFileHttpHandler(File fileToDownload) { - Preconditions.checkArgument( - fileToDownload.exists(), "The file to be download not exists!"); - this.file = fileToDownload; - } + void testCreateMissingParents(@TempDir Path tempDir) { + File targetDir = tempDir.resolve("p1").resolve("p2").resolve("base-dir").toFile(); + assertThat(targetDir.getParentFile().getParentFile()).doesNotExist(); - @Override - public void handle(HttpExchange exchange) throws IOException { - exchange.getResponseHeaders().add("Content-Type", contentType); - exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, file.length()); - FileUtils.copyFile(this.file, exchange.getResponseBody()); - exchange.close(); - } + ArtifactUtils.createMissingParents(targetDir); + assertThat(targetDir.getParentFile()).isDirectory(); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java index 46097bbf5ec..f53ae539774 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java +++ b/flink-clients/src/test/java/org/apache/flink/client/testjar/ClasspathProviderExtension.java @@ -58,6 +58,9 @@ public class ClasspathProviderExtension implements BeforeEachCallback, AfterEach private static final Path JOB_LIB_JAR_PATH = Paths.get("target", "flink-clients-test-utils-job-lib-jar.jar"); + private static final Path ADDITIONAL_ARTIFACT_JAR_PATH = + Paths.get("target", "flink-clients-test-utils-additional-artifact-jar.jar"); + protected File temporaryFolder = org.assertj.core.util.Files.newTemporaryFolder(); private final String directoryNameSuffix; @@ -110,6 +113,14 @@ public class ClasspathProviderExtension implements BeforeEachCallback, AfterEach TEST_JOB_JAR_PATH.toFile()); } + public static ClasspathProviderExtension createWithAdditionalArtifact() { + return new ClasspathProviderExtension( + "_user_dir_with_additional_artifact", + directory -> copyJar(ADDITIONAL_ARTIFACT_JAR_PATH, directory), + ADDITIONAL_ARTIFACT_JAR_PATH.toFile(), + null); + } + public static String[] parametersForTestJob(String strValue) { return new String[] {"--arg", strValue}; } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfiguration.java index e94ab2cda64..6f3da432815 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfiguration.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfiguration.java @@ -38,7 +38,7 @@ final class StandaloneApplicationClusterConfiguration extends EntrypointClusterC @Nullable private final String jobClassName; - @Nullable private final String jarFile; + @Nullable private final String[] jars; StandaloneApplicationClusterConfiguration( @Nonnull String configDir, @@ -49,13 +49,17 @@ final class StandaloneApplicationClusterConfiguration extends EntrypointClusterC @Nonnull SavepointRestoreSettings savepointRestoreSettings, @Nullable JobID jobId, @Nullable String jobClassName, - @Nullable String jarFile) { + @Nullable String[] jars) { super(configDir, dynamicProperties, args, hostname, restPort); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); this.jobId = jobId; this.jobClassName = jobClassName; - this.jarFile = jarFile; + this.jars = jars; + } + + boolean hasJars() { + return jars != null && jars.length > 0; } @Nonnull @@ -74,7 +78,7 @@ final class StandaloneApplicationClusterConfiguration extends EntrypointClusterC } @Nullable - String getJarFile() { - return jarFile; + String[] getJars() { + return jars; } } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java index f00c3eddc93..112a65dd18f 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java @@ -63,20 +63,21 @@ public class StandaloneApplicationClusterConfigurationParserFactory .desc("Job ID of the job to run.") .build(); - private static final Option JOB_JAR_FILE = - Option.builder("jarfile") - .longOpt("jar-file") + private static final Option JARS_OPTION = + Option.builder("jars") + .longOpt("jars") .required(false) - .hasArg(true) - .argName("job jar file") - .desc("Jar File of the job to run.") + .hasArgs() + .valueSeparator(',') + .argName("jar file(s) for job") + .desc("Jar file of the job to run.") .build(); @Override public Options getOptions() { final Options options = new Options(); options.addOption(CONFIG_DIR_OPTION); - options.addOption(JOB_JAR_FILE); + options.addOption(JARS_OPTION); options.addOption(REST_PORT_OPTION); options.addOption(JOB_CLASS_NAME_OPTION); options.addOption(JOB_ID_OPTION); @@ -100,7 +101,7 @@ public class StandaloneApplicationClusterConfigurationParserFactory CliFrontendParser.createSavepointRestoreSettings(commandLine); final JobID jobId = getJobId(commandLine); final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt()); - final String jarFile = commandLine.getOptionValue(JOB_JAR_FILE.getOpt()); + final String[] jarFiles = commandLine.getOptionValues(JARS_OPTION.getOpt()); return new StandaloneApplicationClusterConfiguration( configDir, @@ -111,7 +112,7 @@ public class StandaloneApplicationClusterConfigurationParserFactory savepointRestoreSettings, jobId, jobClassName, - jarFile); + jarFiles); } private int getRestPort(CommandLine commandLine) throws FlinkParseException { diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java index 3b525394b1d..ca77c16768b 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPoint.java @@ -21,14 +21,12 @@ package org.apache.flink.container.entrypoint; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; -import org.apache.flink.client.cli.ArtifactFetchOptions; import org.apache.flink.client.deployment.application.ApplicationClusterEntryPoint; import org.apache.flink.client.program.DefaultPackagedProgramRetriever; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramRetriever; -import org.apache.flink.client.program.artifact.ArtifactUtils; +import org.apache.flink.client.program.artifact.ArtifactFetchManager; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.plugin.PluginManager; @@ -42,13 +40,9 @@ import org.apache.flink.runtime.security.contexts.SecurityContext; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.function.FunctionUtils; import java.io.File; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; +import java.util.Collection; /** An {@link ApplicationClusterEntryPoint} which is started with a job in a predefined location. */ @Internal @@ -122,7 +116,6 @@ public final class StandaloneApplicationClusterEntryPoint extends ApplicationClu StandaloneApplicationClusterConfiguration clusterConfiguration) { Configuration configuration = loadConfiguration(clusterConfiguration); setStaticJobId(clusterConfiguration, configuration); - setJobJarFile(clusterConfiguration, configuration); SavepointRestoreSettings.toConfiguration( clusterConfiguration.getSavepointRestoreSettings(), configuration); return configuration; @@ -131,16 +124,25 @@ public final class StandaloneApplicationClusterEntryPoint extends ApplicationClu private static PackagedProgram getPackagedProgram( final StandaloneApplicationClusterConfiguration clusterConfiguration, Configuration flinkConfiguration) - throws FlinkException { + throws Exception { final File userLibDir = ClusterEntrypointUtils.tryFindUserLibDirectory().orElse(null); - File jarFile = - clusterConfiguration.getJarFile() == null - ? null - : fetchJarFileForApplicationMode(flinkConfiguration).get(0); + + File jobJar = null; + Collection<File> artifacts = null; + if (clusterConfiguration.hasJars()) { + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(flinkConfiguration); + ArtifactFetchManager.Result res = + fetchMgr.fetchArtifacts(clusterConfiguration.getJars()); + + jobJar = res.getJobJar(); + artifacts = res.getArtifacts(); + } + final PackagedProgramRetriever programRetriever = DefaultPackagedProgramRetriever.create( userLibDir, - jarFile, + jobJar, + artifacts, clusterConfiguration.getJobClassName(), clusterConfiguration.getArgs(), flinkConfiguration); @@ -155,26 +157,4 @@ public final class StandaloneApplicationClusterEntryPoint extends ApplicationClu configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); } } - - private static void setJobJarFile( - StandaloneApplicationClusterConfiguration clusterConfiguration, - Configuration configuration) { - final String jarFile = clusterConfiguration.getJarFile(); - if (jarFile != null) { - configuration.set(PipelineOptions.JARS, Collections.singletonList(jarFile)); - } - } - - private static List<File> fetchJarFileForApplicationMode(Configuration configuration) { - String targetDir = generateJarDir(configuration); - return configuration.get(PipelineOptions.JARS).stream() - .map( - FunctionUtils.uncheckedFunction( - uri -> ArtifactUtils.fetch(uri, configuration, targetDir))) - .collect(Collectors.toList()); - } - - public static String generateJarDir(Configuration configuration) { - return configuration.get(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR); - } } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java index 37bde570fad..d7a80774cf5 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.configuration.PipelineOptionsInternal; import org.apache.flink.configuration.RestOptions; import org.apache.flink.runtime.entrypoint.FlinkParseException; @@ -63,7 +62,10 @@ class StandaloneApplicationClusterConfigurationParserFactoryTest { new StandaloneApplicationClusterConfigurationParserFactory()); private static final String JOB_CLASS_NAME = "foobar"; - private static final String JOB_JAR_FILE = "local:///opt/flink/artifacts/my-flink-job.jar"; + private static final String[] JOB_JARS = { + "local:///opt/flink/artifacts/my-flink-job.jar", + "local:///opt/flink/artifacts/my-additional-dep.jar" + }; @Test void testEntrypointClusterConfigurationToConfigurationParsing() throws FlinkParseException { @@ -87,9 +89,9 @@ class StandaloneApplicationClusterConfigurationParserFactoryTest { String.valueOf(restPort), "--job-classname", JOB_CLASS_NAME, + "--jars", + String.join(",", JOB_JARS), String.format("-D%s=%s", key, value), - "--jar-file", - JOB_JAR_FILE, arg1, arg2 }; @@ -97,6 +99,7 @@ class StandaloneApplicationClusterConfigurationParserFactoryTest { final StandaloneApplicationClusterConfiguration clusterConfiguration = commandLineParser.parse(args); assertThat(clusterConfiguration.getJobClassName()).isEqualTo(JOB_CLASS_NAME); + assertThat(clusterConfiguration.getJars()).isEqualTo(JOB_JARS); assertThat(clusterConfiguration.getArgs()).contains(arg1, arg2); final Configuration configuration = @@ -110,7 +113,6 @@ class StandaloneApplicationClusterConfigurationParserFactoryTest { assertThat(configuration.get(RestOptions.PORT)).isEqualTo(restPort); assertThat(configuration.get(DeploymentOptions.TARGET)).isEqualTo(value); - assertThat(configuration.get(PipelineOptions.JARS).get(0)).isEqualTo(JOB_JAR_FILE); } @Test @@ -244,7 +246,7 @@ class StandaloneApplicationClusterConfigurationParserFactoryTest { final String jobClassName = JOB_CLASS_NAME; final JobID jobId = new JobID(); final String savepointRestorePath = "s3://foo/bar"; - final String jobJarFile = JOB_JAR_FILE; + final String jars = String.join(",", JOB_JARS); final String[] args = { "-c", confDirPath, @@ -254,8 +256,8 @@ class StandaloneApplicationClusterConfigurationParserFactoryTest { jobId.toString(), "-s", savepointRestorePath, - "-jarfile", - jobJarFile, + "-jars", + jars, "-n" }; @@ -265,6 +267,7 @@ class StandaloneApplicationClusterConfigurationParserFactoryTest { assertThat(clusterConfiguration.getConfigDir()).isEqualTo(confDirPath); assertThat(clusterConfiguration.getJobClassName()).isEqualTo(jobClassName); assertThat(clusterConfiguration.getJobId()).isEqualTo(jobId); + assertThat(clusterConfiguration.getJars()).isEqualTo(JOB_JARS); final SavepointRestoreSettings savepointRestoreSettings = clusterConfiguration.getSavepointRestoreSettings(); @@ -285,12 +288,17 @@ class StandaloneApplicationClusterConfigurationParserFactoryTest { } @Test - void testJarFileOption() throws FlinkParseException { + void testJarsOption() throws FlinkParseException { final String[] args = { - "--configDir", confDirPath, "--job-classname", "foobar", "--jar-file", JOB_JAR_FILE + "--configDir", + confDirPath, + "--job-classname", + "foobar", + "--jars", + String.join(",", JOB_JARS) }; final StandaloneApplicationClusterConfiguration applicationClusterConfiguration = commandLineParser.parse(args); - assertThat(applicationClusterConfiguration.getJarFile()).isEqualTo(JOB_JAR_FILE); + assertThat(applicationClusterConfiguration.getJars()).containsExactlyInAnyOrder(JOB_JARS); } } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPointTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPointTest.java deleted file mode 100644 index 8efe5f329a4..00000000000 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterEntryPointTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.container.entrypoint; - -import org.apache.flink.client.cli.ArtifactFetchOptions; -import org.apache.flink.configuration.Configuration; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.nio.file.Path; - -/** Tests for the {@link StandaloneApplicationClusterEntryPointTest}. */ -public class StandaloneApplicationClusterEntryPointTest { - - private static final Logger LOG = - LoggerFactory.getLogger(StandaloneApplicationClusterEntryPointTest.class); - - private Configuration configuration; - @TempDir Path tempDir; - - @BeforeEach - public void setup() { - configuration = new Configuration(); - configuration.setString( - ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, tempDir.toAbsolutePath().toString()); - } - - @Test - public void testGenerateJarDir() { - String baseDir = StandaloneApplicationClusterEntryPoint.generateJarDir(configuration); - String expectedDir = String.join(File.separator, new String[] {tempDir.toString()}); - Assertions.assertEquals(expectedDir, baseDir); - } -} diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java index 460ad9e5288..8e680de327b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java @@ -63,7 +63,8 @@ public final class GlobalConfiguration { "service-key", "token", "basic-auth", - "jaas.config" + "jaas.config", + "http-headers" }; // the hidden content to be displayed diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java index 7a35102f2ce..27382e41be4 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/configuration/KubernetesConfigOptions.java @@ -20,7 +20,6 @@ package org.apache.flink.kubernetes.configuration; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.docs.Documentation; -import org.apache.flink.client.cli.ArtifactFetchOptions; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ExternalResourceOptions; @@ -506,33 +505,6 @@ public class KubernetesConfigOptions { + "Flink. A typical use-case is when one uses Flink Kubernetes " + "Operator."); - public static final ConfigOption<Map<String, String>> KUBERNETES_USER_ARTIFACT_HTTP_HEADER = - ConfigOptions.key("kubernetes.user.artifacts.http.header") - .mapType() - .noDefaultValue() - .withDescription( - "Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the application job artifacts. " - + "Expected format: headerKey1:headerValue1,headerKey2:headerValue2."); - - public static final ConfigOption<String> KUBERNETES_USER_ARTIFACTS_BASE_DIR = - ConfigOptions.key("kubernetes.user.artifacts.base.dir") - .stringType() - .defaultValue("/opt/flink/artifacts") - .withDescription("The base dir to put the application job artifacts."); - - public static final ConfigOption<Boolean> KUBERNETES_USER_ARTIFACTS_EMPTYDIR_ENABLE = - ConfigOptions.key("kubernetes.user.artifacts.emptyDir.enable") - .booleanType() - .defaultValue(true) - .withDescription( - Description.builder() - .text( - "Whether to enable create mount an empty dir for %s to keep user artifacts if container restart.", - code( - ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR - .key())) - .build()); - /** * This will only be used to support blocklist mechanism, which is experimental currently, so we * do not want to expose this option in the documentation. diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java index 69cdb0c0a33..e3c7672122c 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java @@ -26,7 +26,7 @@ import org.apache.flink.client.program.DefaultPackagedProgramRetriever; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramRetriever; import org.apache.flink.client.program.PackagedProgramUtils; -import org.apache.flink.client.program.artifact.ArtifactUtils; +import org.apache.flink.client.program.artifact.ArtifactFetchManager; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.fs.FileSystem; @@ -41,14 +41,14 @@ import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.function.FunctionUtils; import javax.annotation.Nullable; import java.io.File; +import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; /** An {@link ApplicationClusterEntryPoint} for Kubernetes. */ @Internal @@ -126,11 +126,12 @@ public final class KubernetesApplicationClusterEntrypoint extends ApplicationClu // No need to do pipelineJars validation if it is a PyFlink job. if (!(PackagedProgramUtils.isPython(jobClassName) || PackagedProgramUtils.isPython(programArguments))) { - final List<File> pipelineJarFiles = fetchJarFileForApplicationMode(configuration); - Preconditions.checkArgument(pipelineJarFiles.size() == 1, "Should only have one jar"); + final ArtifactFetchManager.Result fetchRes = fetchArtifacts(configuration); + return DefaultPackagedProgramRetriever.create( userLibDir, - pipelineJarFiles.get(0), + fetchRes.getJobJar(), + fetchRes.getArtifacts(), jobClassName, programArguments, configuration); @@ -140,29 +141,29 @@ public final class KubernetesApplicationClusterEntrypoint extends ApplicationClu userLibDir, jobClassName, programArguments, configuration); } - /** - * Fetch the user jar from path. - * - * @param configuration Flink Configuration - * @return User jar File - */ - public static List<File> fetchJarFileForApplicationMode(Configuration configuration) { - String targetDir = generateJarDir(configuration); - return configuration.get(PipelineOptions.JARS).stream() - .map( - FunctionUtils.uncheckedFunction( - uri -> ArtifactUtils.fetch(uri, configuration, targetDir))) - .collect(Collectors.toList()); + private static ArtifactFetchManager.Result fetchArtifacts(Configuration configuration) { + try { + String targetDir = generateJarDir(configuration); + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration, targetDir); + + List<String> uris = configuration.get(PipelineOptions.JARS); + checkArgument(uris.size() == 1, "Should only have one jar"); + List<String> additionalUris = + configuration + .getOptional(ArtifactFetchOptions.ARTIFACT_LIST) + .orElse(Collections.emptyList()); + + return fetchMgr.fetchArtifacts(uris.get(0), additionalUris); + } catch (Exception e) { + throw new RuntimeException(e); + } } - public static String generateJarDir(Configuration configuration) { + static String generateJarDir(Configuration configuration) { return String.join( File.separator, - new String[] { - new File(configuration.get(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR)) - .getAbsolutePath(), - configuration.get(KubernetesConfigOptions.NAMESPACE), - configuration.get(KubernetesConfigOptions.CLUSTER_ID) - }); + new File(configuration.get(ArtifactFetchOptions.BASE_DIR)).getAbsolutePath(), + configuration.get(KubernetesConfigOptions.NAMESPACE), + configuration.get(KubernetesConfigOptions.CLUSTER_ID)); } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java index 963cef9015c..00977ef8944 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java @@ -91,6 +91,16 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator { .withDnsPolicy(dnsPolicy) .endSpec(); + // Specify volume for user artifact(s) + basicPodBuilder + .editOrNewSpec() + .addNewVolume() + .withName(Constants.USER_ARTIFACTS_VOLUME) + .withNewEmptyDir() + .endEmptyDir() + .endVolume() + .endSpec(); + // Merge fields basicPodBuilder .editOrNewMetadata() @@ -107,17 +117,7 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator { .endSpec(); final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer()); - if (flinkConfig.getBoolean( - KubernetesConfigOptions.KUBERNETES_USER_ARTIFACTS_EMPTYDIR_ENABLE)) { - basicPodBuilder - .editOrNewSpec() - .addNewVolume() - .withName(Constants.USER_ARTIFACTS_VOLUME) - .withNewEmptyDir() - .endEmptyDir() - .endVolume() - .endSpec(); - } + return new FlinkPod.Builder(flinkPod) .withPod(basicPodBuilder.build()) .withMainContainer(basicMainContainer) @@ -160,6 +160,13 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator { .withImagePullPolicy(imagePullPolicy) .withResources(requirements); + // Mount volume for user artifact(s) + mainContainerBuilder + .addNewVolumeMount() + .withName(Constants.USER_ARTIFACTS_VOLUME) + .withMountPath(kubernetesJobManagerParameters.getUserArtifactsBaseDir()) + .endVolumeMount(); + // Merge fields mainContainerBuilder .addAllToPorts(getContainerPorts()) @@ -171,14 +178,7 @@ public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator { .withNewFieldRef(API_VERSION, POD_IP_FIELD_PATH) .build()) .endEnv(); - if (flinkConfig.getBoolean( - KubernetesConfigOptions.KUBERNETES_USER_ARTIFACTS_EMPTYDIR_ENABLE)) { - mainContainerBuilder - .addNewVolumeMount() - .withName(Constants.USER_ARTIFACTS_VOLUME) - .withMountPath(kubernetesJobManagerParameters.getUserArtifactsBaseDir()) - .endVolumeMount(); - } + getFlinkLogDirEnv().ifPresent(mainContainerBuilder::addToEnv); return mainContainerBuilder.build(); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java index c19c61a7eef..845f930d18d 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java @@ -207,6 +207,6 @@ public class KubernetesJobManagerParameters extends AbstractKubernetesParameters } public String getUserArtifactsBaseDir() { - return flinkConfig.getString(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR); + return flinkConfig.getString(ArtifactFetchOptions.BASE_DIR); } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java index 1925a722b4c..6ea22c6b3bb 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java @@ -44,8 +44,7 @@ public class KubernetesApplicationClusterEntrypointTest { @BeforeEach public void setup() { configuration = new Configuration(); - configuration.setString( - ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, tempDir.toAbsolutePath().toString()); + configuration.setString(ArtifactFetchOptions.BASE_DIR, tempDir.toAbsolutePath().toString()); configuration.setString(KubernetesConfigOptions.NAMESPACE, TEST_NAMESPACE); configuration.setString(KubernetesConfigOptions.CLUSTER_ID, TEST_CLUSTER_ID); } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java index 83c17a14db0..e9b6aafadb9 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/factory/KubernetesJobManagerFactoryTest.java @@ -491,7 +491,7 @@ class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBase { @Test public void testArtifactsEmptyDirVolume() throws IOException { - flinkConfig.set(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, "/opt/artifacts"); + flinkConfig.set(ArtifactFetchOptions.BASE_DIR, "/opt/artifacts"); kubernetesJobManagerSpecification = KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( flinkPod, kubernetesJobManagerParameters); @@ -509,26 +509,4 @@ class KubernetesJobManagerFactoryTest extends KubernetesJobManagerTestBase { kubernetesJobManagerParameters .getUserArtifactsBaseDir())); } - - @Test - public void testTurnOffArtifactsEmptyDirVolume() throws IOException { - flinkConfig.set(KubernetesConfigOptions.KUBERNETES_USER_ARTIFACTS_EMPTYDIR_ENABLE, false); - flinkConfig.set(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, "/opt/artifacts"); - kubernetesJobManagerSpecification = - KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( - flinkPod, kubernetesJobManagerParameters); - final PodSpec podSpec = - kubernetesJobManagerSpecification.getDeployment().getSpec().getTemplate().getSpec(); - assertThat(podSpec.getVolumes()) - .noneMatch(resource -> resource.getName().equals(Constants.USER_ARTIFACTS_VOLUME)); - final Container container = podSpec.getContainers().get(0); - assertThat(container.getVolumeMounts()) - .noneMatch( - resource -> - resource.getName().equals(Constants.USER_ARTIFACTS_VOLUME) - && resource.getMountPath() - .equals( - kubernetesJobManagerParameters - .getUserArtifactsBaseDir())); - } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java index 9de8ac968ed..67a6fd51fbf 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParametersTest.java @@ -258,7 +258,7 @@ class KubernetesJobManagerParametersTest extends KubernetesTestBase { @Test public void testGetUserArtifactsBaseDir() { - flinkConfig.set(ArtifactFetchOptions.USER_ARTIFACTS_BASE_DIR, "/opt/job/artifacts"); + flinkConfig.set(ArtifactFetchOptions.BASE_DIR, "/opt/job/artifacts"); assertThat(kubernetesJobManagerParameters.getUserArtifactsBaseDir()) .isEqualTo("/opt/job/artifacts"); } diff --git a/flink-test-utils-parent/flink-clients-test-utils/pom.xml b/flink-test-utils-parent/flink-clients-test-utils/pom.xml index 33f51a7c923..608acf6e679 100644 --- a/flink-test-utils-parent/flink-clients-test-utils/pom.xml +++ b/flink-test-utils-parent/flink-clients-test-utils/pom.xml @@ -93,6 +93,26 @@ under the License. <shadedClassifierName>job-lib-jar</shadedClassifierName> </configuration> </execution> + <execution> + <id>test-user-classloader-no-main-jar</id> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*</artifact> + <includes> + <include>**/TestUserClassLoaderAdditionalArtifact.*</include> + <include>META-INF/**</include> + </includes> + </filter> + </filters> + <finalName>test-user-classloader-additional-artifact-jar</finalName> + <shadedArtifactAttached>true</shadedArtifactAttached> + <shadedClassifierName>additional-artifact-jar</shadedClassifierName> + </configuration> + </execution> </executions> </plugin> </plugins> diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java b/flink-test-utils-parent/flink-clients-test-utils/src/main/java/org/apache/flink/client/testjar/TestUserClassLoaderAdditionalArtifact.java similarity index 56% copy from flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java copy to flink-test-utils-parent/flink-clients-test-utils/src/main/java/org/apache/flink/client/testjar/TestUserClassLoaderAdditionalArtifact.java index 8d43998079b..35800679745 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/artifact/ArtifactFetcher.java +++ b/flink-test-utils-parent/flink-clients-test-utils/src/main/java/org/apache/flink/client/testjar/TestUserClassLoaderAdditionalArtifact.java @@ -16,23 +16,12 @@ * limitations under the License. */ -package org.apache.flink.client.program.artifact; +package org.apache.flink.client.testjar; -import org.apache.flink.configuration.Configuration; +/** This class is used to test classloading from additional user artifacts. */ +public class TestUserClassLoaderAdditionalArtifact { -import java.io.File; - -/** The artifact fetcher. */ -public interface ArtifactFetcher { - - /** - * Fetch the resource from the uri to the targetDir. - * - * @param uri The artifact to be fetched. - * @param flinkConfiguration Flink configuration. - * @param targetDir The target dir to put the artifact. - * @return The path of the fetched artifact. - * @throws Exception Error during fetching the artifact. - */ - File fetch(String uri, Configuration flinkConfiguration, File targetDir) throws Exception; + int getNum() { + return 1; + } }