This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8d29d3  [SPARK-32217] Plumb whether a worker would also be 
decommissioned along with executor
f8d29d3 is described below

commit f8d29d371cdf0b8be6a48a9124ffbc3c0794f32a
Author: Devesh Agrawal <devesh.agra...@gmail.com>
AuthorDate: Wed Jul 22 21:04:06 2020 -0700

    [SPARK-32217] Plumb whether a worker would also be decommissioned along 
with executor
    
    ### What changes were proposed in this pull request?
    
    This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` 
along
    with the DecommissionExecutor message.
    
    ### Why are the changes needed?
    
    The primary motivation is to know whether a decommissioned executor
    would also be loosing shuffle files -- and thus it is important to know
    whether the host would also be decommissioned.
    
    In the absence of this PR, the existing code assumes that decommissioning 
an executor does not loose the whole host with it, and thus does not clear the 
shuffle state if external shuffle service is enabled. While this may hold in 
some cases (like K8s decommissioning an executor pod, or YARN container 
preemption), it does not hold in others like when the cluster is managed by a 
Standalone Scheduler (Master). This is similar to the existing `workerLost` 
field in the `ExecutorProcessLost [...]
    
    In the future, this `ExecutorDecommissionInfo` can be embellished for
    knowing how long the executor has to live for scenarios like Cloud spot
    kills (or Yarn preemption) and the like.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tweaked an existing unit test in `AppClientSuite`
    
    Closes #29032 from agrawaldevesh/plumb_decom_info.
    
    Authored-by: Devesh Agrawal <devesh.agra...@gmail.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../spark/deploy/client/StandaloneAppClient.scala  |  4 +++-
 .../client/StandaloneAppClientListener.scala       |  4 +++-
 .../org/apache/spark/deploy/master/Master.scala    |  5 +++-
 .../executor/CoarseGrainedExecutorBackend.scala    | 16 +++++++++----
 .../spark/scheduler/ExecutorDecommissionInfo.scala | 28 ++++++++++++++++++++++
 .../org/apache/spark/scheduler/TaskScheduler.scala |  2 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  3 ++-
 .../cluster/CoarseGrainedClusterMessage.scala      |  4 +++-
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 22 +++++++++--------
 .../cluster/StandaloneSchedulerBackend.scala       |  6 ++---
 .../spark/deploy/client/AppClientSuite.scala       | 15 ++++++++----
 .../apache/spark/scheduler/DAGSchedulerSuite.scala |  8 +++++--
 .../scheduler/ExternalClusterManagerSuite.scala    |  4 +++-
 .../WorkerDecommissionExtendedSuite.scala          |  2 +-
 .../spark/scheduler/WorkerDecommissionSuite.scala  |  2 +-
 .../BlockManagerDecommissionIntegrationSuite.scala |  2 +-
 16 files changed, 92 insertions(+), 35 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index eedf5e9..a6da839 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
 import org.apache.spark.util.{RpcUtils, ThreadUtils}
 
 /**
@@ -181,7 +182,8 @@ private[spark] class StandaloneAppClient(
         if (ExecutorState.isFinished(state)) {
           listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, 
workerLost)
         } else if (state == ExecutorState.DECOMMISSIONED) {
-          listener.executorDecommissioned(fullId, message.getOrElse(""))
+          listener.executorDecommissioned(fullId,
+            ExecutorDecommissionInfo(message.getOrElse(""), 
isHostDecommissioned = workerLost))
         }
 
       case WorkerRemoved(id, host, message) =>
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
 
b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index 2e38a68..e72f7e9 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.deploy.client
 
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
+
 /**
  * Callbacks invoked by deploy client when various events happen. There are 
currently five events:
  * connecting to the cluster, disconnecting, being given an executor, having 
an executor removed
@@ -39,7 +41,7 @@ private[spark] trait StandaloneAppClientListener {
   def executorRemoved(
       fullId: String, message: String, exitStatus: Option[Int], workerLost: 
Boolean): Unit
 
-  def executorDecommissioned(fullId: String, message: String): Unit
+  def executorDecommissioned(fullId: String, decommissionInfo: 
ExecutorDecommissionInfo): Unit
 
   def workerRemoved(workerId: String, host: String, message: String): Unit
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0070df1..220e1c9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -908,7 +908,10 @@ private[deploy] class Master(
         logInfo("Telling app of decommission executors")
         exec.application.driver.send(ExecutorUpdated(
           exec.id, ExecutorState.DECOMMISSIONED,
-          Some("worker decommissioned"), None, workerLost = false))
+          Some("worker decommissioned"), None,
+          // workerLost is being set to true here to let the driver know that 
the host (aka. worker)
+          // is also being decommissioned.
+          workerLost = true))
         exec.state = ExecutorState.DECOMMISSIONED
         exec.application.removeExecutor(exec)
       }
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index e072d79..def125b 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -40,7 +40,7 @@ import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.resource.ResourceProfile._
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc._
-import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
+import org.apache.spark.scheduler.{ExecutorDecommissionInfo, 
ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, 
SignalUtils, ThreadUtils, Utils}
@@ -166,11 +166,15 @@ private[spark] class CoarseGrainedExecutorBackend(
         exitExecutor(1, "Received LaunchTask command but executor was null")
       } else {
         if (decommissioned) {
-          logError("Asked to launch a task while decommissioned.")
+          val msg = "Asked to launch a task while decommissioned."
+          logError(msg)
           driver match {
             case Some(endpoint) =>
               logInfo("Sending DecommissionExecutor to driver.")
-              endpoint.send(DecommissionExecutor(executorId))
+              endpoint.send(
+                DecommissionExecutor(
+                  executorId,
+                  ExecutorDecommissionInfo(msg, isHostDecommissioned = false)))
             case _ =>
               logError("No registered driver to send Decommission to.")
           }
@@ -259,12 +263,14 @@ private[spark] class CoarseGrainedExecutorBackend(
   }
 
   private def decommissionSelf(): Boolean = {
-    logInfo("Decommissioning self w/sync")
+    val msg = "Decommissioning self w/sync"
+    logInfo(msg)
     try {
       decommissioned = true
       // Tell master we are are decommissioned so it stops trying to schedule 
us
       if (driver.nonEmpty) {
-        driver.get.askSync[Boolean](DecommissionExecutor(executorId))
+        driver.get.askSync[Boolean](DecommissionExecutor(
+            executorId, ExecutorDecommissionInfo(msg, false)))
       } else {
         logError("No driver to message decommissioning.")
       }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
new file mode 100644
index 0000000..a82b5d3
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * Provides more detail when an executor is being decommissioned.
+ * @param message Human readable reason for why the decommissioning is 
happening.
+ * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in 
other places) is
+ *                             being decommissioned too. Used to infer if the 
shuffle data might
+ *                             be lost even if the external shuffle service is 
enabled.
+ */
+private[spark]
+case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: 
Boolean)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 08f9f3c..b29458c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -101,7 +101,7 @@ private[spark] trait TaskScheduler {
   /**
    * Process a decommissioning executor.
    */
-  def executorDecommission(executorId: String): Unit
+  def executorDecommission(executorId: String, decommissionInfo: 
ExecutorDecommissionInfo): Unit
 
   /**
    * Process a lost executor
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 12bd932..28e138e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -912,7 +912,8 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
-  override def executorDecommission(executorId: String): Unit = {
+  override def executorDecommission(
+      executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
     rootPool.executorDecommission(executorId)
     backend.reviveOffers()
   }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index bb929c2..91485f0 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
 import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
 import org.apache.spark.scheduler.ExecutorLossReason
 import org.apache.spark.util.SerializableBuffer
 
@@ -94,7 +95,8 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
     extends CoarseGrainedClusterMessage
 
-  case class DecommissionExecutor(executorId: String)  extends 
CoarseGrainedClusterMessage
+  case class DecommissionExecutor(executorId: String, decommissionInfo: 
ExecutorDecommissionInfo)
+    extends CoarseGrainedClusterMessage
 
   case class RemoveWorker(workerId: String, host: String, message: String)
     extends CoarseGrainedClusterMessage
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b4b3efa..8fbefae 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -191,9 +191,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         
executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor))
         removeExecutor(executorId, reason)
 
-      case DecommissionExecutor(executorId) =>
-        logError(s"Received decommission executor message ${executorId}.")
-        decommissionExecutor(executorId)
+      case DecommissionExecutor(executorId, decommissionInfo) =>
+        logError(s"Received decommission executor message ${executorId}: 
$decommissionInfo")
+        decommissionExecutor(executorId, decommissionInfo)
 
       case RemoveWorker(workerId, host, message) =>
         removeWorker(workerId, host, message)
@@ -272,9 +272,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         removeWorker(workerId, host, message)
         context.reply(true)
 
-      case DecommissionExecutor(executorId) =>
-        logError(s"Received decommission executor message ${executorId}.")
-        decommissionExecutor(executorId)
+      case DecommissionExecutor(executorId, decommissionInfo) =>
+        logError(s"Received decommission executor message ${executorId}: 
${decommissionInfo}.")
+        decommissionExecutor(executorId, decommissionInfo)
         context.reply(true)
 
       case RetrieveSparkAppConfig(resourceProfileId) =>
@@ -422,7 +422,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     /**
      * Mark a given executor as decommissioned and stop making resource offers 
for it.
      */
-    private def decommissionExecutor(executorId: String): Boolean = {
+    private def decommissionExecutor(
+        executorId: String, decommissionInfo: ExecutorDecommissionInfo): 
Boolean = {
       val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
         // Only bother decommissioning executors which are alive.
         if (isExecutorActive(executorId)) {
@@ -436,7 +437,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       if (shouldDisable) {
         logInfo(s"Starting decommissioning executor $executorId.")
         try {
-          scheduler.executorDecommission(executorId)
+          scheduler.executorDecommission(executorId, decommissionInfo)
         } catch {
           case e: Exception =>
             logError(s"Unexpected error during decommissioning ${e.toString}", 
e)
@@ -590,10 +591,11 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Called by subclasses when notified of a decommissioning executor.
    */
-  private[spark] def decommissionExecutor(executorId: String): Unit = {
+  private[spark] def decommissionExecutor(
+      executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
     if (driverEndpoint != null) {
       logInfo("Propagating executor decommission to driver.")
-      driverEndpoint.send(DecommissionExecutor(executorId))
+      driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo))
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 4024b44..d921af6 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -174,10 +174,10 @@ private[spark] class StandaloneSchedulerBackend(
     removeExecutor(fullId.split("/")(1), reason)
   }
 
-  override def executorDecommissioned(fullId: String, message: String) {
+  override def executorDecommissioned(fullId: String, decommissionInfo: 
ExecutorDecommissionInfo) {
     logInfo("Asked to decommission executor")
-    decommissionExecutor(fullId.split("/")(1))
-    logInfo("Executor %s decommissioned: %s".format(fullId, message))
+    decommissionExecutor(fullId.split("/")(1), decommissionInfo)
+    logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
   }
 
   override def workerRemoved(workerId: String, host: String, message: String): 
Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index a3e39d7..e091bd0 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.deploy.client
 
 import java.io.Closeable
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
 
 import scala.concurrent.duration._
 
@@ -32,6 +32,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, 
Master}
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
 import org.apache.spark.util.Utils
 
 /**
@@ -126,7 +127,10 @@ class AppClientSuite
       // Decommissioning is async.
       eventually(timeout(1.seconds), interval(10.millis)) {
         // We only record decommissioning for the executor we've requested
-        assert(ci.listener.execDecommissionedList.size === 1)
+        assert(ci.listener.execDecommissionedMap.size === 1)
+        val decommissionInfo = 
ci.listener.execDecommissionedMap.get(executorId)
+        assert(decommissionInfo != null && 
decommissionInfo.isHostDecommissioned,
+          s"$executorId should have been decommissioned along with its worker")
       }
 
       // Send request to kill executor, verify request was made
@@ -215,7 +219,7 @@ class AppClientSuite
     val deadReasonList = new ConcurrentLinkedQueue[String]()
     val execAddedList = new ConcurrentLinkedQueue[String]()
     val execRemovedList = new ConcurrentLinkedQueue[String]()
-    val execDecommissionedList = new ConcurrentLinkedQueue[String]()
+    val execDecommissionedMap = new ConcurrentHashMap[String, 
ExecutorDecommissionInfo]()
 
     def connected(id: String): Unit = {
       connectedIdList.add(id)
@@ -245,8 +249,9 @@ class AppClientSuite
       execRemovedList.add(id)
     }
 
-    def executorDecommissioned(id: String, message: String): Unit = {
-      execDecommissionedList.add(id)
+    def executorDecommissioned(id: String, decommissionInfo: 
ExecutorDecommissionInfo): Unit = {
+      val previousDecommissionInfo = execDecommissionedMap.putIfAbsent(id, 
decommissionInfo)
+      assert(previousDecommissionInfo === null, s"Expected no previous 
decommission info for $id")
     }
 
     def workerRemoved(workerId: String, host: String, message: String): Unit = 
{}
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 664cfc8..c82a5ef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -172,10 +172,12 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     }
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
     override def defaultParallelism() = 2
-    override def executorDecommission(executorId: String) = {}
     override def executorLost(executorId: String, reason: ExecutorLossReason): 
Unit = {}
     override def workerRemoved(workerId: String, host: String, message: 
String): Unit = {}
     override def applicationAttemptId(): Option[String] = None
+    override def executorDecommission(
+      executorId: String,
+      decommissionInfo: ExecutorDecommissionInfo): Unit = {}
   }
 
   /**
@@ -777,10 +779,12 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
           accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
           blockManagerId: BlockManagerId,
           executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true
-      override def executorDecommission(executorId: String): Unit = {}
       override def executorLost(executorId: String, reason: 
ExecutorLossReason): Unit = {}
       override def workerRemoved(workerId: String, host: String, message: 
String): Unit = {}
       override def applicationAttemptId(): Option[String] = None
+      override def executorDecommission(
+        executorId: String,
+        decommissionInfo: ExecutorDecommissionInfo): Unit = {}
     }
     val noKillScheduler = new DAGScheduler(
       sc,
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 7ead51b..b2a5f77 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -90,7 +90,6 @@ private class DummyTaskScheduler extends TaskScheduler {
   override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit 
= {}
   override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
   override def defaultParallelism(): Int = 2
-  override def executorDecommission(executorId: String): Unit = {}
   override def executorLost(executorId: String, reason: ExecutorLossReason): 
Unit = {}
   override def workerRemoved(workerId: String, host: String, message: String): 
Unit = {}
   override def applicationAttemptId(): Option[String] = None
@@ -99,4 +98,7 @@ private class DummyTaskScheduler extends TaskScheduler {
       accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
       blockManagerId: BlockManagerId,
       executorMetrics: Map[(Int, Int), ExecutorMetrics]): Boolean = true
+  override def executorDecommission(
+    executorId: String,
+    decommissionInfo: ExecutorDecommissionInfo): Unit = {}
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
index 4de5aae..d95deb1 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
@@ -65,7 +65,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite 
with LocalSparkConte
 
       val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
       sc.getExecutorIds().tail.foreach { id =>
-        sched.decommissionExecutor(id)
+        sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false))
         assert(rdd3.sortByKey().collect().length === 100)
       }
     }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
index cd3ab4d..3c34070 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -73,7 +73,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
     // decom.sh message passing is tested manually.
     val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
     val execs = sched.getExecutorIds()
-    execs.foreach(execId => sched.decommissionExecutor(execId))
+    execs.foreach(execId => sched.decommissionExecutor(execId, 
ExecutorDecommissionInfo("", false)))
     val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
     assert(asyncCountResult === 10)
     // Try and launch task after decommissioning, this should fail
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index afcb38b..5741010 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -146,7 +146,7 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
 
     val execToDecommission = execs.head
     logDebug(s"Decommissioning executor ${execToDecommission}")
-    sched.decommissionExecutor(execToDecommission)
+    sched.decommissionExecutor(execToDecommission, 
ExecutorDecommissionInfo("", false))
 
     // Wait for job to finish.
     val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to