This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f8d29d3 [SPARK-32217] Plumb whether a worker would also be decommissioned along with executor f8d29d3 is described below commit f8d29d371cdf0b8be6a48a9124ffbc3c0794f32a Author: Devesh Agrawal <devesh.agra...@gmail.com> AuthorDate: Wed Jul 22 21:04:06 2020 -0700 [SPARK-32217] Plumb whether a worker would also be decommissioned along with executor ### What changes were proposed in this pull request? This PR is a giant plumbing PR that plumbs an `ExecutorDecommissionInfo` along with the DecommissionExecutor message. ### Why are the changes needed? The primary motivation is to know whether a decommissioned executor would also be loosing shuffle files -- and thus it is important to know whether the host would also be decommissioned. In the absence of this PR, the existing code assumes that decommissioning an executor does not loose the whole host with it, and thus does not clear the shuffle state if external shuffle service is enabled. While this may hold in some cases (like K8s decommissioning an executor pod, or YARN container preemption), it does not hold in others like when the cluster is managed by a Standalone Scheduler (Master). This is similar to the existing `workerLost` field in the `ExecutorProcessLost [...] In the future, this `ExecutorDecommissionInfo` can be embellished for knowing how long the executor has to live for scenarios like Cloud spot kills (or Yarn preemption) and the like. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tweaked an existing unit test in `AppClientSuite` Closes #29032 from agrawaldevesh/plumb_decom_info. Authored-by: Devesh Agrawal <devesh.agra...@gmail.com> Signed-off-by: Holden Karau <hka...@apple.com> --- .../spark/deploy/client/StandaloneAppClient.scala | 4 +++- .../client/StandaloneAppClientListener.scala | 4 +++- .../org/apache/spark/deploy/master/Master.scala | 5 +++- .../executor/CoarseGrainedExecutorBackend.scala | 16 +++++++++---- .../spark/scheduler/ExecutorDecommissionInfo.scala | 28 ++++++++++++++++++++++ .../org/apache/spark/scheduler/TaskScheduler.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 3 ++- .../cluster/CoarseGrainedClusterMessage.scala | 4 +++- .../cluster/CoarseGrainedSchedulerBackend.scala | 22 +++++++++-------- .../cluster/StandaloneSchedulerBackend.scala | 6 ++--- .../spark/deploy/client/AppClientSuite.scala | 15 ++++++++---- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 8 +++++-- .../scheduler/ExternalClusterManagerSuite.scala | 4 +++- .../WorkerDecommissionExtendedSuite.scala | 2 +- .../spark/scheduler/WorkerDecommissionSuite.scala | 2 +- .../BlockManagerDecommissionIntegrationSuite.scala | 2 +- 16 files changed, 92 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index eedf5e9..a6da839 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.internal.Logging import org.apache.spark.rpc._ +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.util.{RpcUtils, ThreadUtils} /** @@ -181,7 +182,8 @@ private[spark] class StandaloneAppClient( if (ExecutorState.isFinished(state)) { listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) } else if (state == ExecutorState.DECOMMISSIONED) { - listener.executorDecommissioned(fullId, message.getOrElse("")) + listener.executorDecommissioned(fullId, + ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost)) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index 2e38a68..e72f7e9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.client +import org.apache.spark.scheduler.ExecutorDecommissionInfo + /** * Callbacks invoked by deploy client when various events happen. There are currently five events: * connecting to the cluster, disconnecting, being given an executor, having an executor removed @@ -39,7 +41,7 @@ private[spark] trait StandaloneAppClientListener { def executorRemoved( fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit - def executorDecommissioned(fullId: String, message: String): Unit + def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit def workerRemoved(workerId: String, host: String, message: String): Unit } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0070df1..220e1c9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -908,7 +908,10 @@ private[deploy] class Master( logInfo("Telling app of decommission executors") exec.application.driver.send(ExecutorUpdated( exec.id, ExecutorState.DECOMMISSIONED, - Some("worker decommissioned"), None, workerLost = false)) + Some("worker decommissioned"), None, + // workerLost is being set to true here to let the driver know that the host (aka. worker) + // is also being decommissioned. + workerLost = true)) exec.state = ExecutorState.DECOMMISSIONED exec.application.removeExecutor(exec) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index e072d79..def125b 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -40,7 +40,7 @@ import org.apache.spark.resource.ResourceProfile import org.apache.spark.resource.ResourceProfile._ import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ -import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription} +import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorLossReason, TaskDescription} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, SignalUtils, ThreadUtils, Utils} @@ -166,11 +166,15 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(1, "Received LaunchTask command but executor was null") } else { if (decommissioned) { - logError("Asked to launch a task while decommissioned.") + val msg = "Asked to launch a task while decommissioned." + logError(msg) driver match { case Some(endpoint) => logInfo("Sending DecommissionExecutor to driver.") - endpoint.send(DecommissionExecutor(executorId)) + endpoint.send( + DecommissionExecutor( + executorId, + ExecutorDecommissionInfo(msg, isHostDecommissioned = false))) case _ => logError("No registered driver to send Decommission to.") } @@ -259,12 +263,14 @@ private[spark] class CoarseGrainedExecutorBackend( } private def decommissionSelf(): Boolean = { - logInfo("Decommissioning self w/sync") + val msg = "Decommissioning self w/sync" + logInfo(msg) try { decommissioned = true // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { - driver.get.askSync[Boolean](DecommissionExecutor(executorId)) + driver.get.askSync[Boolean](DecommissionExecutor( + executorId, ExecutorDecommissionInfo(msg, false))) } else { logError("No driver to message decommissioning.") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala new file mode 100644 index 0000000..a82b5d3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +/** + * Provides more detail when an executor is being decommissioned. + * @param message Human readable reason for why the decommissioning is happening. + * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is + * being decommissioned too. Used to infer if the shuffle data might + * be lost even if the external shuffle service is enabled. + */ +private[spark] +case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 08f9f3c..b29458c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -101,7 +101,7 @@ private[spark] trait TaskScheduler { /** * Process a decommissioning executor. */ - def executorDecommission(executorId: String): Unit + def executorDecommission(executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit /** * Process a lost executor diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 12bd932..28e138e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -912,7 +912,8 @@ private[spark] class TaskSchedulerImpl( } } - override def executorDecommission(executorId: String): Unit = { + override def executorDecommission( + executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { rootPool.executorDecommission(executorId) backend.reviveOffers() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index bb929c2..91485f0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -22,6 +22,7 @@ import java.nio.ByteBuffer import org.apache.spark.TaskState.TaskState import org.apache.spark.resource.{ResourceInformation, ResourceProfile} import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.scheduler.ExecutorLossReason import org.apache.spark.util.SerializableBuffer @@ -94,7 +95,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RemoveExecutor(executorId: String, reason: ExecutorLossReason) extends CoarseGrainedClusterMessage - case class DecommissionExecutor(executorId: String) extends CoarseGrainedClusterMessage + case class DecommissionExecutor(executorId: String, decommissionInfo: ExecutorDecommissionInfo) + extends CoarseGrainedClusterMessage case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b4b3efa..8fbefae 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -191,9 +191,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) removeExecutor(executorId, reason) - case DecommissionExecutor(executorId) => - logError(s"Received decommission executor message ${executorId}.") - decommissionExecutor(executorId) + case DecommissionExecutor(executorId, decommissionInfo) => + logError(s"Received decommission executor message ${executorId}: $decommissionInfo") + decommissionExecutor(executorId, decommissionInfo) case RemoveWorker(workerId, host, message) => removeWorker(workerId, host, message) @@ -272,9 +272,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp removeWorker(workerId, host, message) context.reply(true) - case DecommissionExecutor(executorId) => - logError(s"Received decommission executor message ${executorId}.") - decommissionExecutor(executorId) + case DecommissionExecutor(executorId, decommissionInfo) => + logError(s"Received decommission executor message ${executorId}: ${decommissionInfo}.") + decommissionExecutor(executorId, decommissionInfo) context.reply(true) case RetrieveSparkAppConfig(resourceProfileId) => @@ -422,7 +422,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Mark a given executor as decommissioned and stop making resource offers for it. */ - private def decommissionExecutor(executorId: String): Boolean = { + private def decommissionExecutor( + executorId: String, decommissionInfo: ExecutorDecommissionInfo): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { // Only bother decommissioning executors which are alive. if (isExecutorActive(executorId)) { @@ -436,7 +437,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp if (shouldDisable) { logInfo(s"Starting decommissioning executor $executorId.") try { - scheduler.executorDecommission(executorId) + scheduler.executorDecommission(executorId, decommissionInfo) } catch { case e: Exception => logError(s"Unexpected error during decommissioning ${e.toString}", e) @@ -590,10 +591,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp /** * Called by subclasses when notified of a decommissioning executor. */ - private[spark] def decommissionExecutor(executorId: String): Unit = { + private[spark] def decommissionExecutor( + executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { if (driverEndpoint != null) { logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId)) + driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 4024b44..d921af6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -174,10 +174,10 @@ private[spark] class StandaloneSchedulerBackend( removeExecutor(fullId.split("/")(1), reason) } - override def executorDecommissioned(fullId: String, message: String) { + override def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo) { logInfo("Asked to decommission executor") - decommissionExecutor(fullId.split("/")(1)) - logInfo("Executor %s decommissioned: %s".format(fullId, message)) + decommissionExecutor(fullId.split("/")(1), decommissionInfo) + logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo)) } override def workerRemoved(workerId: String, host: String, message: String): Unit = { diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index a3e39d7..e091bd0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.client import java.io.Closeable -import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} import scala.concurrent.duration._ @@ -32,6 +32,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, Master} import org.apache.spark.deploy.worker.Worker import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.RpcEnv +import org.apache.spark.scheduler.ExecutorDecommissionInfo import org.apache.spark.util.Utils /** @@ -126,7 +127,10 @@ class AppClientSuite // Decommissioning is async. eventually(timeout(1.seconds), interval(10.millis)) { // We only record decommissioning for the executor we've requested - assert(ci.listener.execDecommissionedList.size === 1) + assert(ci.listener.execDecommissionedMap.size === 1) + val decommissionInfo = ci.listener.execDecommissionedMap.get(executorId) + assert(decommissionInfo != null && decommissionInfo.isHostDecommissioned, + s"$executorId should have been decommissioned along with its worker") } // Send request to kill executor, verify request was made @@ -215,7 +219,7 @@ class AppClientSuite val deadReasonList = new ConcurrentLinkedQueue[String]() val execAddedList = new ConcurrentLinkedQueue[String]() val execRemovedList = new ConcurrentLinkedQueue[String]() - val execDecommissionedList = new ConcurrentLinkedQueue[String]() + val execDecommissionedMap = new ConcurrentHashMap[String, ExecutorDecommissionInfo]() def connected(id: String): Unit = { connectedIdList.add(id) @@ -245,8 +249,9 @@ class AppClientSuite execRemovedList.add(id) } - def executorDecommissioned(id: String, message: String): Unit = { - execDecommissionedList.add(id) + def executorDecommissioned(id: String, decommissionInfo: ExecutorDecommissionInfo): Unit = { + val previousDecommissionInfo = execDecommissionedMap.putIfAbsent(id, decommissionInfo) + assert(previousDecommissionInfo === null, s"Expected no previous decommission info for $id") } def workerRemoved(workerId: String, host: String, message: String): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 664cfc8..c82a5ef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -172,10 +172,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } override def setDAGScheduler(dagScheduler: DAGScheduler) = {} override def defaultParallelism() = 2 - override def executorDecommission(executorId: String) = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def executorDecommission( + executorId: String, + decommissionInfo: ExecutorDecommissionInfo): Unit = {} } /** @@ -777,10 +779,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorUpdates: Map[(Int, Int), ExecutorMetrics]): Boolean = true - override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None + override def executorDecommission( + executorId: String, + decommissionInfo: ExecutorDecommissionInfo): Unit = {} } val noKillScheduler = new DAGScheduler( sc, diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index 7ead51b..b2a5f77 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -90,7 +90,6 @@ private class DummyTaskScheduler extends TaskScheduler { override def notifyPartitionCompletion(stageId: Int, partitionId: Int): Unit = {} override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {} override def defaultParallelism(): Int = 2 - override def executorDecommission(executorId: String): Unit = {} override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} override def workerRemoved(workerId: String, host: String, message: String): Unit = {} override def applicationAttemptId(): Option[String] = None @@ -99,4 +98,7 @@ private class DummyTaskScheduler extends TaskScheduler { accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId, executorMetrics: Map[(Int, Int), ExecutorMetrics]): Boolean = true + override def executorDecommission( + executorId: String, + decommissionInfo: ExecutorDecommissionInfo): Unit = {} } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index 4de5aae..d95deb1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -65,7 +65,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] sc.getExecutorIds().tail.foreach { id => - sched.decommissionExecutor(id) + sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false)) assert(rdd3.sortByKey().collect().length === 100) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index cd3ab4d..3c34070 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -73,7 +73,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() - execs.foreach(execId => sched.decommissionExecutor(execId)) + execs.foreach(execId => sched.decommissionExecutor(execId, ExecutorDecommissionInfo("", false))) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) // Try and launch task after decommissioning, this should fail diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index afcb38b..5741010 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -146,7 +146,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val execToDecommission = execs.head logDebug(s"Decommissioning executor ${execToDecommission}") - sched.decommissionExecutor(execToDecommission) + sched.decommissionExecutor(execToDecommission, ExecutorDecommissionInfo("", false)) // Wait for job to finish. val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 15.seconds) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org