This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dca8380 [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation dca8380 is described below commit dca838058ffd0e2c01591fd9ab0f192de446d606 Author: Marcelo Vanzin <van...@cloudera.com> AuthorDate: Thu Jan 16 13:37:11 2020 -0800 [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic allocation The issue here is that when Spark is downscaling the application and deletes a few pod requests that aren't needed anymore, it may actually race with the K8S scheduler, who may be bringing up those executors. So they may have enough time to connect back to the driver, register, to just be deleted soon after. This wastes resources and causes misleading entries in the driver log. The change (ab)uses the blacklisting mechanism to consider the deleted excess pods as blacklisted, so that if they try to connect back, the driver will deny it. It also changes the executor registration slightly, since even with the above change there were misleading logs. That was because the executor registration message was an RPC that always succeeded (bar network issues), so the executor would always try to send an unregistration message to the driver, which would then log several messages about not knowing anything about the executor. The change makes the registration RPC succeed or fail directly, instead of using the separate failure message that would lead to this issue. Note the last change required some changes in a standalone test suite related to dynamic allocation, since it relied on the driver not throwing exceptions when a duplicate executor registration happened. Tested with existing unit tests, and with live cluster with dyn alloc on. Closes #26586 from vanzin/SPARK-29950. Authored-by: Marcelo Vanzin <van...@cloudera.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../executor/CoarseGrainedExecutorBackend.scala | 14 +++-- .../cluster/CoarseGrainedClusterMessage.scala | 7 --- .../cluster/CoarseGrainedSchedulerBackend.scala | 19 +++++-- .../deploy/StandaloneDynamicAllocationSuite.scala | 65 ++++++++++++++-------- .../CoarseGrainedSchedulerBackendSuite.scala | 1 + .../cluster/k8s/ExecutorPodsAllocator.scala | 18 ++++++ .../k8s/KubernetesClusterSchedulerBackend.scala | 4 ++ .../DeterministicExecutorPodsSnapshotsStore.scala | 9 +++ .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 11 ++++ 9 files changed, 105 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b1837c9..1fe901a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -54,6 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend( resourcesFileOpt: Option[String]) extends IsolatedRpcEndpoint with ExecutorBackend with Logging { + import CoarseGrainedExecutorBackend._ + private implicit val formats = DefaultFormats private[this] val stopping = new AtomicBoolean(false) @@ -80,9 +82,8 @@ private[spark] class CoarseGrainedExecutorBackend( ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls, extractAttributes, resources)) }(ThreadUtils.sameThread).onComplete { - // This is a very fast action so we can use "ThreadUtils.sameThread" - case Success(msg) => - // Always receive `true`. Just ignore it + case Success(_) => + self.send(RegisteredExecutor) case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) @@ -133,9 +134,6 @@ private[spark] class CoarseGrainedExecutorBackend( exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } - case RegisterExecutorFailed(message) => - exitExecutor(1, "Slave registration failed: " + message) - case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") @@ -226,6 +224,10 @@ private[spark] class CoarseGrainedExecutorBackend( private[spark] object CoarseGrainedExecutorBackend extends Logging { + // Message used internally to start the executor when the driver successfully accepted the + // registration request. + case object RegisteredExecutor + case class Arguments( driverUrl: String, executorId: String, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 9ce2368..57317e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -48,13 +48,6 @@ private[spark] object CoarseGrainedClusterMessages { case class KillExecutorsOnHost(host: String) extends CoarseGrainedClusterMessage - sealed trait RegisterExecutorResponse - - case object RegisteredExecutor extends CoarseGrainedClusterMessage with RegisterExecutorResponse - - case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage - with RegisterExecutorResponse - case class UpdateDelegationTokens(tokens: Array[Byte]) extends CoarseGrainedClusterMessage diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 7c7d8c2..031b9af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -207,15 +207,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, attributes, resources) => if (executorDataMap.contains(executorId)) { - executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) - context.reply(true) - } else if (scheduler.nodeBlacklist.contains(hostname)) { + context.sendFailure(new IllegalStateException(s"Duplicate executor ID: $executorId")) + } else if (scheduler.nodeBlacklist.contains(hostname) || + isBlacklisted(executorId, hostname)) { // If the cluster manager gives us an executor on a blacklisted node (because it // already started allocating those resources before we informed it of our blacklist, // or if it ignored our blacklist), then we reject that executor immediately. logInfo(s"Rejecting $executorId as it has been blacklisted.") - executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId")) - context.reply(true) + context.sendFailure(new IllegalStateException(s"Executor is blacklisted: $executorId")) } else { // If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor. @@ -250,7 +249,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp logDebug(s"Decremented number of pending executors ($numPendingExecutors left)") } } - executorRef.send(RegisteredExecutor) // Note: some tests expect the reply to come after we put the executor in the map context.reply(true) listenerBus.post( @@ -776,6 +774,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def currentDelegationTokens: Array[Byte] = delegationTokens.get() + /** + * Checks whether the executor is blacklisted. This is called when the executor tries to + * register with the scheduler, and will deny registration if this method returns true. + * + * This is in addition to the blacklist kept by the task scheduler, so custom implementations + * don't need to check there. + */ + protected def isBlacklisted(executorId: String, hostname: String): Boolean = false + // SPARK-27112: We need to ensure that there is ordering of lock acquisition // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix // the deadlock issue exposed in SPARK-27112 diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index dd790b8..e316da7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ -import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker @@ -34,7 +34,7 @@ import org.apache.spark.internal.config import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster._ -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor, RegisterExecutorFailed} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor, RegisterExecutor} /** * End-to-end tests for dynamic allocation in standalone mode. @@ -482,12 +482,16 @@ class StandaloneDynamicAllocationSuite assert(apps.head.getExecutorLimit === Int.MaxValue) } val beforeList = getApplications().head.executors.keys.toSet - assert(killExecutorsOnHost(sc, "localhost").equals(true)) - syncExecutors(sc) - val afterList = getApplications().head.executors.keys.toSet + + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + b.killExecutorsOnHost("localhost") + case _ => fail("expected coarse grained scheduler") + } eventually(timeout(10.seconds), interval(100.millis)) { + val afterList = getApplications().head.executors.keys.toSet assert(beforeList.intersect(afterList).size == 0) } } @@ -514,10 +518,11 @@ class StandaloneDynamicAllocationSuite val scheduler = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv) try { scheduler.start() - scheduler.driverEndpoint.ask[Boolean](message) - eventually(timeout(10.seconds), interval(100.millis)) { - verify(endpointRef).send(RegisterExecutorFailed(any())) + val e = intercept[SparkException] { + scheduler.driverEndpoint.askSync[Boolean](message) } + assert(e.getCause().isInstanceOf[IllegalStateException]) + assert(scheduler.getExecutorIds().isEmpty) } finally { scheduler.stop() } @@ -536,6 +541,11 @@ class StandaloneDynamicAllocationSuite .setMaster(masterRpcEnv.address.toSparkURL) .setAppName("test") .set(config.EXECUTOR_MEMORY.key, "256m") + // Because we're faking executor launches in the Worker, set the config so that the driver + // will not timeout anything related to executors. + .set(config.Network.NETWORK_TIMEOUT.key, "2h") + .set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "1h") + .set(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "1h") } /** Make a master to which our application will send executor requests. */ @@ -549,8 +559,7 @@ class StandaloneDynamicAllocationSuite private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = { (0 until numWorkers).map { i => val rpcEnv = workerRpcEnvs(i) - val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), - Worker.ENDPOINT_NAME, null, conf, securityManager) + val worker = new TestWorker(rpcEnv, cores, memory) rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker) worker } @@ -588,16 +597,6 @@ class StandaloneDynamicAllocationSuite } } - /** Kill the executors on a given host. */ - private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = { - syncExecutors(sc) - sc.schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => - b.killExecutorsOnHost(host) - case _ => fail("expected coarse grained scheduler") - } - } - /** * Return a list of executor IDs belonging to this application. * @@ -620,9 +619,8 @@ class StandaloneDynamicAllocationSuite * we submit a request to kill them. This must be called before each kill request. */ private def syncExecutors(sc: SparkContext): Unit = { - val driverExecutors = sc.env.blockManager.master.getStorageStatus - .map(_.blockManagerId.executorId) - .filter { _ != SparkContext.DRIVER_IDENTIFIER} + val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] + val driverExecutors = backend.getExecutorIds() val masterExecutors = getExecutorIds(sc) val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted missingExecutors.foreach { id => @@ -632,10 +630,29 @@ class StandaloneDynamicAllocationSuite when(endpointRef.address).thenReturn(mockAddress) val message = RegisterExecutor(id, endpointRef, "localhost", 10, Map.empty, Map.empty, Map.empty) - val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend] backend.driverEndpoint.askSync[Boolean](message) backend.driverEndpoint.send(LaunchedExecutor(id)) } } + /** + * Worker implementation that does not actually launch any executors, but reports them as + * running so the Master keeps track of them. This requires that `syncExecutors` be used + * to make sure the Master instance and the SparkContext under test agree about what + * executors are running. + */ + private class TestWorker(rpcEnv: RpcEnv, cores: Int, memory: Int) + extends Worker( + rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), Worker.ENDPOINT_NAME, + null, conf, securityManager) { + + override def receive: PartialFunction[Any, Unit] = testReceive.orElse(super.receive) + + private def testReceive: PartialFunction[Any, Unit] = synchronized { + case LaunchExecutor(_, appId, execId, _, _, _, _) => + self.send(ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)) + } + + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index f916f63..29160a3 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -189,6 +189,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo val conf = new SparkConf() .set(EXECUTOR_CORES, 1) .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive during test + .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor registrations .setMaster( "coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]") .setAppName("test") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 2201bf9..b394f35 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -72,6 +72,11 @@ private[spark] class ExecutorPodsAllocator( private var lastSnapshot = ExecutorPodsSnapshot(Nil) + // Executors that have been deleted by this allocator but not yet detected as deleted in + // a snapshot from the API server. This is used to deny registration from these executors + // if they happen to come up before the deletion takes effect. + @volatile private var deletedExecutorIds = Set.empty[Long] + def start(applicationId: String): Unit = { snapshotsStore.addSubscriber(podAllocationDelay) { onNewSnapshots(applicationId, _) @@ -85,6 +90,8 @@ private[spark] class ExecutorPodsAllocator( } } + def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong) + private def onNewSnapshots( applicationId: String, snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized { @@ -141,10 +148,17 @@ private[spark] class ExecutorPodsAllocator( } .map { case (id, _) => id } + // Make a local, non-volatile copy of the reference since it's used multiple times. This + // is the only method that modifies the list, so this is safe. + var _deletedExecutorIds = deletedExecutorIds + if (snapshots.nonEmpty) { logDebug(s"Pod allocation status: $currentRunningCount running, " + s"${currentPendingExecutors.size} pending, " + s"${newlyCreatedExecutors.size} unacknowledged.") + + val existingExecs = lastSnapshot.executorPods.keySet + _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains) } val currentTotalExpectedExecutors = totalExpectedExecutors.get @@ -169,6 +183,8 @@ private[spark] class ExecutorPodsAllocator( if (toDelete.nonEmpty) { logInfo(s"Deleting ${toDelete.size} excess pod requests (${toDelete.mkString(",")}).") + _deletedExecutorIds = _deletedExecutorIds ++ toDelete + Utils.tryLogNonFatalError { kubernetesClient .pods() @@ -209,6 +225,8 @@ private[spark] class ExecutorPodsAllocator( } } + deletedExecutorIds = _deletedExecutorIds + // Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this // update method when not needed. hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index e221a92..105841a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -181,6 +181,10 @@ private[spark] class KubernetesClusterSchedulerBackend( Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)) } + override protected def isBlacklisted(executorId: String, hostname: String): Boolean = { + podAllocator.isDeleted(executorId) + } + private class KubernetesDriverEndpoint extends DriverEndpoint { override def onDisconnected(rpcAddress: RpcAddress): Unit = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala index 1b6dfe5..9ac7e02 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala @@ -48,4 +48,13 @@ class DeterministicExecutorPodsSnapshotsStore extends ExecutorPodsSnapshotsStore currentSnapshot = ExecutorPodsSnapshot(newSnapshot) snapshotsBuffer += currentSnapshot } + + def removeDeletedExecutors(): Unit = { + val nonDeleted = currentSnapshot.executorPods.filter { + case (_, PodDeleted(_)) => false + case _ => true + } + currentSnapshot = ExecutorPodsSnapshot(nonDeleted) + snapshotsBuffer += currentSnapshot + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 4475d5d..a0abded 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -189,6 +189,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(podOperations, times(4)).create(any()) verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4") verify(podOperations).delete() + assert(podsAllocatorUnderTest.isDeleted("3")) + assert(podsAllocatorUnderTest.isDeleted("4")) + + // Update the snapshot to not contain the deleted executors, make sure the + // allocator cleans up internal state. + snapshotsStore.updatePod(deletedExecutor(3)) + snapshotsStore.updatePod(deletedExecutor(4)) + snapshotsStore.removeDeletedExecutors() + snapshotsStore.notifySubscribers() + assert(!podsAllocatorUnderTest.isDeleted("3")) + assert(!podsAllocatorUnderTest.isDeleted("4")) } private def executorPodAnswer(): Answer[SparkPod] = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org