This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fe94bf0  [SPARK-36014][K8S] Use uuid as app id in kubernetes client 
mode
fe94bf0 is described below

commit fe94bf07f9acec302e7d8becd7e576c777337331
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Sun Jul 18 15:41:47 2021 -0700

    [SPARK-36014][K8S] Use uuid as app id in kubernetes client mode
    
    ### What changes were proposed in this pull request?
    
    Use uuid instead of `System. currentTimeMillis` as app id in kubernetes 
client mode.
    
    ### Why are the changes needed?
    
    Currently, spark on kubernetes with client mode would use 
`"spark-application-" + System.currentTimeMillis` as app id by default. It 
would cause app id conflict if submit several spark applications to kubernetes 
cluster in a short time.
    
    Unfortunately, the event log use app id as the file name. With the conflict 
event log file, the exception was thrown.
    
    ```
    Caused by: java.io.FileNotFoundException: File does not exist: 
xxx/spark-application-1624766876324.lz4.inprogress (inode 5984170846) Holder 
does not have any open files.
            at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2697)
            at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.analyzeFileState(FSDirWriteFileOp.java:521)
            at 
org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.validateAddBlock(FSDirWriteFileOp.java:161)
            at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2579)
            at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:846)
            at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:510)
            at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
            at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)
            at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:871)
            at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:817)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes
    
    ### How was this patch tested?
    
    manual test
    
    
![image](https://user-images.githubusercontent.com/12025282/124435341-7a88e180-dda7-11eb-8e62-bdfec6a0ee3b.png)
    
    Closes #33211 from ulysses-you/k8s-appid.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala    | 5 ++++-
 .../spark/deploy/k8s/submit/KubernetesClientApplication.scala      | 4 +---
 .../scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala  | 7 ++++---
 3 files changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index 937c5f5..de084da 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.deploy.k8s
 
-import java.util.Locale
+import java.util.{Locale, UUID}
 
 import io.fabric8.kubernetes.api.model.{LocalObjectReference, 
LocalObjectReferenceBuilder, Pod}
 
@@ -225,6 +225,9 @@ private[spark] object KubernetesConf {
     new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, 
driverPod, resourceProfileId)
   }
 
+  def getKubernetesAppId(): String =
+    s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}"
+
   def getResourceNamePrefix(appName: String): String = {
     val id = KubernetesUtils.uniqueID()
     s"$appName-$id"
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 3140502..e3b80b1 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.spark.deploy.k8s.submit
 
-import java.util.UUID
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.control.Breaks._
@@ -191,7 +189,7 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
     // to be added as a label to group resources belonging to the same 
application. Label values are
     // considerably restrictive, e.g. must be no longer than 63 characters in 
length. So we generate
     // a unique app ID (captured by spark.app.id) in the format below.
-    val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", 
"")}"
+    val kubernetesAppId = KubernetesConf.getKubernetesAppId()
     val kubernetesConf = KubernetesConf.createDriverConf(
       sparkConf,
       kubernetesAppId,
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 5dad6a3..42a9300 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -24,9 +24,9 @@ import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.SparkContext
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.deploy.k8s.KubernetesUtils
 import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import 
org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
@@ -48,6 +48,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
     watchEvents: ExecutorPodsWatchSnapshotSource,
     pollEvents: ExecutorPodsPollingSnapshotSource)
     extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
+  private val appId = KubernetesConf.getKubernetesAppId()
 
   protected override val minRegisteredRatio =
     if (conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) {
@@ -83,12 +84,12 @@ private[spark] class KubernetesClusterSchedulerBackend(
   /**
    * Get an application ID associated with the job.
    * This returns the string value of spark.app.id if set, otherwise
-   * the locally-generated ID from the superclass.
+   * the locally-generated ID.
    *
    * @return The application ID
    */
   override def applicationId(): String = {
-    
conf.getOption("spark.app.id").map(_.toString).getOrElse(super.applicationId)
+    conf.getOption("spark.app.id").map(_.toString).getOrElse(appId)
   }
 
   override def start(): Unit = {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to