This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a3aec1f904 [SPARK-44906][K8S] Make 
`Kubernetes[Driver|Executor]Conf.annotations` substitute annotations instead of 
feature steps
2a3aec1f904 is described below

commit 2a3aec1f9040e08999a2df88f92340cd2710e552
Author: zwangsheng <2213335...@qq.com>
AuthorDate: Wed Aug 23 10:42:47 2023 -0700

    [SPARK-44906][K8S] Make `Kubernetes[Driver|Executor]Conf.annotations` 
substitute annotations instead of feature steps
    
    ### What changes were proposed in this pull request?
    
    Move `Utils. SubstituteAppNExecIds` logic  into 
`KubernetesConf.annotations` as the default logic,
    
    ### Why are the changes needed?
    
    Easy for users to reuse, rather than to rewrite it again at the same logic.
    
    When user write custom feature step and using annotations, before this pr, 
they should call `Utils. SubstituteAppNExecIds` once.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but no sense for user to use annotations.
    
    ### How was this patch tested?
    
    Add unit test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #42600 from zwangsheng/SPARK-44906.
    
    Lead-authored-by: zwangsheng <2213335...@qq.com>
    Co-authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/deploy/k8s/KubernetesConf.scala  |  2 ++
 .../spark/deploy/k8s/features/BasicDriverFeatureStep.scala  |  3 +--
 .../deploy/k8s/features/BasicExecutorFeatureStep.scala      |  6 ++----
 .../org/apache/spark/deploy/k8s/KubernetesConfSuite.scala   | 13 ++++++++++---
 4 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index d8cb881bf08..4ebf31ae44e 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -117,6 +117,7 @@ private[spark] class KubernetesDriverConf(
 
   override def annotations: Map[String, String] = {
     KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+      .map { case (k, v) => (k, Utils.substituteAppNExecIds(v, appId, "")) }
   }
 
   def serviceLabels: Map[String, String] = {
@@ -188,6 +189,7 @@ private[spark] class KubernetesExecutorConf(
 
   override def annotations: Map[String, String] = {
     KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
+      .map { case(k, v) => (k, Utils.substituteAppNExecIds(v, appId, 
executorId)) }
   }
 
   override def secretNamesToMountPaths: Map[String, String] = {
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 2b287ea8560..11a21bb68a6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -143,8 +143,7 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       .editOrNewMetadata()
         .withName(driverPodName)
         .addToLabels(conf.labels.asJava)
-        .addToAnnotations(conf.annotations.map { case (k, v) =>
-          (k, Utils.substituteAppNExecIds(v, conf.appId, "")) }.asJava)
+        .addToAnnotations(conf.annotations.asJava)
         .endMetadata()
       .editOrNewSpec()
         .withRestartPolicy("Never")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 0b0bbc30ba4..f3e5cad8c9e 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -255,14 +255,12 @@ private[spark] class BasicExecutorFeatureStep(
       case "statefulset" => "Always"
       case _ => "Never"
     }
-    val annotations = kubernetesConf.annotations.map { case (k, v) =>
-      (k, Utils.substituteAppNExecIds(v, kubernetesConf.appId, 
kubernetesConf.executorId))
-    }
+
     val executorPodBuilder = new PodBuilder(pod.pod)
       .editOrNewMetadata()
         .withName(name)
         .addToLabels(kubernetesConf.labels.asJava)
-        .addToAnnotations(annotations.asJava)
+        .addToAnnotations(kubernetesConf.annotations.asJava)
         .addToOwnerReferences(ownerReference.toSeq: _*)
         .endMetadata()
       .editOrNewSpec()
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
index 3d310a831ea..9963db016ad 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesConfSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+import org.apache.spark.util.Utils
 
 class KubernetesConfSuite extends SparkFunSuite {
 
@@ -42,7 +43,9 @@ class KubernetesConfSuite extends SparkFunSuite {
     "customLabel2Key" -> "customLabel2Value")
   private val CUSTOM_ANNOTATIONS = Map(
     "customAnnotation1Key" -> "customAnnotation1Value",
-    "customAnnotation2Key" -> "customAnnotation2Value")
+    "customAnnotation2Key" -> "customAnnotation2Value",
+    "customAnnotation3Key" -> "{{APP_ID}}",
+    "customAnnotation4Key" -> "{{EXECUTOR_ID}}")
   private val SECRET_NAMES_TO_MOUNT_PATHS = Map(
     "secret1" -> "/mnt/secrets/secret1",
     "secret2" -> "/mnt/secrets/secret2")
@@ -93,7 +96,9 @@ class KubernetesConfSuite extends SparkFunSuite {
       SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName),
       SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) ++
       CUSTOM_LABELS)
-    assert(conf.annotations === CUSTOM_ANNOTATIONS)
+    assert(conf.annotations === CUSTOM_ANNOTATIONS.map {
+      case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, ""))
+    })
     assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
     assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS)
     assert(conf.environment === CUSTOM_ENVS)
@@ -161,7 +166,9 @@ class KubernetesConfSuite extends SparkFunSuite {
       SPARK_APP_NAME_LABEL -> KubernetesConf.getAppNameLabel(conf.appName),
       SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE,
       SPARK_RESOURCE_PROFILE_ID_LABEL -> DEFAULT_RESOURCE_PROFILE_ID.toString) 
++ CUSTOM_LABELS)
-    assert(conf.annotations === CUSTOM_ANNOTATIONS)
+    assert(conf.annotations === CUSTOM_ANNOTATIONS.map {
+      case (k, v) => (k, Utils.substituteAppNExecIds(v, conf.appId, 
EXECUTOR_ID))
+    })
     assert(conf.secretNamesToMountPaths === SECRET_NAMES_TO_MOUNT_PATHS)
     assert(conf.secretEnvNamesToKeyRefs === SECRET_ENV_VARS)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to