This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 924ebbb6619 [SPARK-41766][CORE] Handle decommission request sent 
before executor registration
924ebbb6619 is described below

commit 924ebbb6619af784dcb42a27f2b6e36fd1523e1d
Author: Warren Zhu <warren.zh...@gmail.com>
AuthorDate: Mon May 1 15:03:28 2023 -0700

    [SPARK-41766][CORE] Handle decommission request sent before executor 
registration
    
    ### What changes were proposed in this pull request?
    Handle decommission request sent before executor registration. When decom 
request is triggered from worker, executor should be decommissioned and not 
accept new tasks, but if scheduler backend got decom request before executor 
registration, the executor won't be decommissioned. So cache the unknown 
executors and resent decommission request after executor registration.
    
    ### Why are the changes needed?
    Current behavior is such requests will be ignored
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added test in CoarseGrainedSchedulerBackendSuite
    
    Closes #39280 from warrenzhu25/decom-missing.
    
    Authored-by: Warren Zhu <warren.zh...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/internal/config/package.scala | 10 +++++
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 24 ++++++++++
 .../CoarseGrainedSchedulerBackendSuite.scala       | 51 ++++++++++++++++++++--
 3 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 125025d6a2f..fa88606e1f6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2260,6 +2260,16 @@ package object config {
       .checkValue(_ >= 0, "needs to be a non-negative value")
       .createWithDefault(0)
 
+  private[spark] val SCHEDULER_MAX_RETAINED_UNKNOWN_EXECUTORS =
+    ConfigBuilder("spark.scheduler.maxRetainedUnknownDecommissionExecutors")
+      .internal()
+      .doc("Max number of unknown executors by decommission to retain. This 
affects " +
+        "whether executor could receive decommission request sent before its 
registration.")
+      .version("3.5.0")
+      .intConf
+      .checkValue(_ >= 0, "needs to be a non-negative value")
+      .createWithDefault(0)
+
   private[spark] val PUSH_BASED_SHUFFLE_ENABLED =
     ConfigBuilder("spark.shuffle.push.enabled")
       .doc("Set to true to enable push-based shuffle on the client side and 
this works in " +
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 1be20bef012..aeac2616711 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -24,6 +24,7 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.mutable.{HashMap, HashSet, Queue}
 import scala.concurrent.Future
 
+import com.google.common.cache.CacheBuilder
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{ExecutorAllocationClient, SparkEnv, TaskState}
@@ -102,6 +103,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // Executors which are being decommissioned. Maps from executorId to 
ExecutorDecommissionInfo.
   protected val executorsPendingDecommission = new HashMap[String, 
ExecutorDecommissionInfo]
 
+  // Unknown Executors which are being decommissioned. This could be caused by 
unregistered executor
+  // This executor should be decommissioned after registration.
+  // Maps from executorId to (ExecutorDecommissionInfo, 
adjustTargetNumExecutors,
+  // triggeredByExecutor).
+  protected val unknownExecutorsPendingDecommission =
+    CacheBuilder.newBuilder()
+      .maximumSize(conf.get(SCHEDULER_MAX_RETAINED_UNKNOWN_EXECUTORS))
+      .build[String, (ExecutorDecommissionInfo, Boolean, Boolean)]()
+
   // A map of ResourceProfile id to map of hostname with its possible task 
number running on it
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
   protected var rpHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty
@@ -295,6 +305,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           listenerBus.post(
             SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, 
data))
           // Note: some tests expect the reply to come after we put the 
executor in the map
+          // Decommission executor whose request received before registration
+          Option(unknownExecutorsPendingDecommission.getIfPresent(executorId))
+            .foreach(v => {
+              decommissionExecutors(Array((executorId, v._1)), v._2, v._3)
+              unknownExecutorsPendingDecommission.invalidate(executorId)
+            })
           context.reply(true)
         }
 
@@ -528,14 +544,22 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     // Do not change this code without running the K8s integration suites
     val executorsToDecommission = executorsAndDecomInfo.flatMap { case 
(executorId, decomInfo) =>
       // Only bother decommissioning executors which are alive.
+      // Keep executor decommission info in case executor started, but not 
registered yet
       if (isExecutorActive(executorId)) {
         scheduler.executorDecommission(executorId, decomInfo)
         executorsPendingDecommission(executorId) = decomInfo
         Some(executorId)
       } else {
+        unknownExecutorsPendingDecommission.put(executorId,
+          (decomInfo, adjustTargetNumExecutors, triggeredByExecutor))
         None
       }
     }
+
+    if (executorsToDecommission.isEmpty) {
+      return executorsToDecommission
+    }
+
     logInfo(s"Decommission executors: ${executorsToDecommission.mkString(", 
")}")
 
     // If we don't want to replace the executors we are decommissioning
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 15c8ab16ec9..c4d4fd7d80e 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -21,8 +21,10 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.mutable
+import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import scala.reflect.ClassTag
 
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.when
@@ -37,10 +39,9 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.resource.{ExecutorResourceRequests, 
ResourceInformation, ResourceProfile, TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
-import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
+import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
ExecutorInfo}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.{RpcUtils, SerializableBuffer, Utils}
 
 class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with 
LocalSparkContext
@@ -492,6 +493,32 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     }
   }
 
+  test("SPARK-41766: New registered executor should receive decommission 
request" +
+    " sent before registration") {
+    val conf = new SparkConf()
+      .setMaster("local-cluster[0, 3, 1024]")
+      .setAppName("test")
+      .set(SCHEDULER_MAX_RETAINED_UNKNOWN_EXECUTORS.key, "1")
+
+    sc = new SparkContext(conf)
+    val backend = 
sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+    val mockEndpointRef = new MockExecutorRpcEndpointRef(conf)
+    val mockAddress = mock[RpcAddress]
+    val executorId = "1"
+    val executorDecommissionInfo = ExecutorDecommissionInfo(
+      s"Executor $executorId is decommissioned")
+
+    backend.decommissionExecutor(executorId, executorDecommissionInfo, false)
+    assert(!mockEndpointRef.decommissionReceived)
+
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, Map(), Map(),
+        Map.empty, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
+
+    sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
+    assert(mockEndpointRef.decommissionReceived)
+  }
+
   private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
     sc.submitJob(
       rdd,
@@ -546,3 +573,21 @@ class TestCoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, override v
 
   def getTaskSchedulerImpl(): TaskSchedulerImpl = scheduler
 }
+
+private[spark] class MockExecutorRpcEndpointRef(conf: SparkConf) extends 
RpcEndpointRef(conf) {
+  // scalastyle:off executioncontextglobal
+  import scala.concurrent.ExecutionContext.Implicits.global
+  // scalastyle:on executioncontextglobal
+
+  var decommissionReceived = false
+
+  override def address: RpcAddress = null
+  override def name: String = "executor"
+  override def send(message: Any): Unit =
+    message match {
+      case DecommissionExecutor => decommissionReceived = true
+    }
+  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] 
= {
+    Future{true.asInstanceOf[T]}
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to