This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new ceae41b  [SPARK-37497][K8S] Promote 
`ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi
ceae41b is described below

commit ceae41ba5cafb479cdcfc9a6a162945646a68f05
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Tue Nov 30 18:41:18 2021 -0800

    [SPARK-37497][K8S] Promote 
`ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi
    
    ### What changes were proposed in this pull request?
    
    This PR aims to promote `ExecutorPodsWatchSnapshotSource` and 
`ExecutorPodsPollingSnapshotSource` as **stable** `DeveloperApi` in order to 
maintain it officially in a backward compatible way at Apache Spark 3.3.0.
    
    ### Why are the changes needed?
    
    - Since SPARK-24248 at Apache Spark 2.4.0, 
`ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` have 
been used to monitor executor pods without any interface changes for over 3 
years.
    
    - Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an 
extensible external cluster manager framework. New `ExternalClusterManager` for 
K8s environment need to depend on this to monitor pods.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manual review.
    
    Closes #34751 from dongjoon-hyun/SPARK-37497.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 2b044962cd6eff5a3a76f2808ee93b40bdf931df)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala    | 13 ++++++++++++-
 .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala      | 14 +++++++++++++-
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
index da7fe7c..6fcb876 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala
@@ -22,12 +22,21 @@ import io.fabric8.kubernetes.client.KubernetesClient
 import scala.collection.JavaConverters._
 
 import org.apache.spark.SparkConf
+import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.{ThreadUtils, Utils}
 
-private[spark] class ExecutorPodsPollingSnapshotSource(
+/**
+ * :: DeveloperApi ::
+ *
+ * A class used for polling K8s executor pods by ExternalClusterManagers.
+ * @since 3.1.3
+ */
+@Stable
+@DeveloperApi
+class ExecutorPodsPollingSnapshotSource(
     conf: SparkConf,
     kubernetesClient: KubernetesClient,
     snapshotsStore: ExecutorPodsSnapshotsStore,
@@ -37,6 +46,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
 
   private var pollingFuture: Future[_] = _
 
+  @Since("3.1.3")
   def start(applicationId: String): Unit = {
     require(pollingFuture == null, "Cannot start polling more than once.")
     logDebug(s"Starting to check for executor pod state every $pollingInterval 
ms.")
@@ -44,6 +54,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
       new PollRunnable(applicationId), pollingInterval, pollingInterval, 
TimeUnit.MILLISECONDS)
   }
 
+  @Since("3.1.3")
   def stop(): Unit = {
     if (pollingFuture != null) {
       pollingFuture.cancel(true)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
index 762878c..06d942e 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
@@ -22,16 +22,27 @@ import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, 
WatcherException}
 import io.fabric8.kubernetes.client.Watcher.Action
 
+import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
-private[spark] class ExecutorPodsWatchSnapshotSource(
+/**
+ * :: DeveloperApi ::
+ *
+ * A class used for watching K8s executor pods by ExternalClusterManagers.
+ *
+ * @since 3.1.3
+ */
+@Stable
+@DeveloperApi
+class ExecutorPodsWatchSnapshotSource(
     snapshotsStore: ExecutorPodsSnapshotsStore,
     kubernetesClient: KubernetesClient) extends Logging {
 
   private var watchConnection: Closeable = _
 
+  @Since("3.1.3")
   def start(applicationId: String): Unit = {
     require(watchConnection == null, "Cannot start the watcher twice.")
     logDebug(s"Starting watch for pods with labels 
$SPARK_APP_ID_LABEL=$applicationId," +
@@ -42,6 +53,7 @@ private[spark] class ExecutorPodsWatchSnapshotSource(
       .watch(new ExecutorPodsWatcher())
   }
 
+  @Since("3.1.3")
   def stop(): Unit = {
     if (watchConnection != null) {
       Utils.tryLogNonFatalError {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to