This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fe94bf0 [SPARK-36014][K8S] Use uuid as app id in kubernetes client mode fe94bf0 is described below commit fe94bf07f9acec302e7d8becd7e576c777337331 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Sun Jul 18 15:41:47 2021 -0700 [SPARK-36014][K8S] Use uuid as app id in kubernetes client mode ### What changes were proposed in this pull request? Use uuid instead of `System. currentTimeMillis` as app id in kubernetes client mode. ### Why are the changes needed? Currently, spark on kubernetes with client mode would use `"spark-application-" + System.currentTimeMillis` as app id by default. It would cause app id conflict if submit several spark applications to kubernetes cluster in a short time. Unfortunately, the event log use app id as the file name. With the conflict event log file, the exception was thrown. ``` Caused by: java.io.FileNotFoundException: File does not exist: xxx/spark-application-1624766876324.lz4.inprogress (inode 5984170846) Holder does not have any open files. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2697) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521) at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2579) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:846) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817) ``` ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? manual test ![image](https://user-images.githubusercontent.com/12025282/124435341-7a88e180-dda7-11eb-8e62-bdfec6a0ee3b.png) Closes #33211 from ulysses-you/k8s-appid. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala | 5 ++++- .../spark/deploy/k8s/submit/KubernetesClientApplication.scala | 4 +--- .../scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala | 7 ++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index 937c5f5..de084da 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.deploy.k8s -import java.util.Locale +import java.util.{Locale, UUID} import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod} @@ -225,6 +225,9 @@ private[spark] object KubernetesConf { new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod, resourceProfileId) } + def getKubernetesAppId(): String = + s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" + def getResourceNamePrefix(appName: String): String = { val id = KubernetesUtils.uniqueID() s"$appName-$id" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala index 3140502..e3b80b1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.deploy.k8s.submit -import java.util.UUID - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.Breaks._ @@ -191,7 +189,7 @@ private[spark] class KubernetesClientApplication extends SparkApplication { // to be added as a label to group resources belonging to the same application. Label values are // considerably restrictive, e.g. must be no longer than 63 characters in length. So we generate // a unique app ID (captured by spark.app.id) in the format below. - val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" + val kubernetesAppId = KubernetesConf.getKubernetesAppId() val kubernetesConf = KubernetesConf.createDriverConf( sparkConf, kubernetesAppId, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 5dad6a3..42a9300 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -24,9 +24,9 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkContext +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.KubernetesUtils import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO @@ -48,6 +48,7 @@ private[spark] class KubernetesClusterSchedulerBackend( watchEvents: ExecutorPodsWatchSnapshotSource, pollEvents: ExecutorPodsPollingSnapshotSource) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) { + private val appId = KubernetesConf.getKubernetesAppId() protected override val minRegisteredRatio = if (conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) { @@ -83,12 +84,12 @@ private[spark] class KubernetesClusterSchedulerBackend( /** * Get an application ID associated with the job. * This returns the string value of spark.app.id if set, otherwise - * the locally-generated ID from the superclass. + * the locally-generated ID. * * @return The application ID */ override def applicationId(): String = { - conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId) + conf.getOption("spark.app.id").map(_.toString).getOrElse(appId) } override def start(): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org