This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 249b214  [SPARK-20732][CORE] Decommission cache blocks to other 
executors when an executor is decommissioned
249b214 is described below

commit 249b214590efc1fc1d385dc8c5321c0ca5f6bab8
Author: Prakhar Jain <prakharjai...@gmail.com>
AuthorDate: Fri Apr 24 11:22:08 2020 -0700

    [SPARK-20732][CORE] Decommission cache blocks to other executors when an 
executor is decommissioned
    
    ### What changes were proposed in this pull request?
    After changes in SPARK-20628, CoarseGrainedSchedulerBackend can 
decommission an executor and stop assigning new tasks on it. We should also 
decommission the corresponding blockmanagers in the same way. i.e. Move the 
cached RDD blocks from those executors to other active executors.
    
    ### Why are the changes needed?
    We need to gracefully decommission the block managers so that the 
underlying RDD cache blocks are not lost in case the executors are taken away 
forcefully after some timeout (because of spotloss/pre-emptible VM etc). Its 
good to save as much cache data as possible.
    
    Also In future once the decommissioning signal comes from Cluster Manager 
(say YARN/Mesos etc), dynamic allocation + this change gives us opportunity to 
downscale the executors faster by making the executors free of cache data.
    
    Note that this is a best effort approach. We try to move cache blocks from 
decommissioning executors to active executors. If the active executors don't 
have free resources available on them for caching, then the decommissioning 
executors will keep the cache block which it was not able to move and it will 
still be able to serve them.
    
    Current overall Flow:
    
    1. CoarseGrainedSchedulerBackend receives a signal to decommissionExecutor. 
On receiving the signal, it do 2 things - Stop assigning new tasks 
(SPARK-20628), Send another message to BlockManagerMasterEndpoint (via 
BlockManagerMaster) to decommission the corresponding BlockManager.
    
    2. BlockManagerMasterEndpoint receives "DecommissionBlockManagers" message. 
On receiving this, it moves the corresponding block managers to 
"decommissioning" state. All decommissioning BMs are excluded from the getPeers 
RPC call which is used for replication. All these decommissioning BMs are also 
sent message from BlockManagerMasterEndpoint to start decommissioning process 
on themselves.
    
    3. BlockManager on worker (say BM-x) receives the 
"DecommissionBlockManager" message. Now it will start 
BlockManagerDecommissionManager thread to offload all the RDD cached blocks. 
This thread can make multiple reattempts to decommission the existing cache 
blocks (multiple reattempts might be needed as there might not be sufficient 
space in other active BMs initially).
    
    ### Does this PR introduce any user-facing change?
    NO
    
    ### How was this patch tested?
    Added UTs.
    
    Closes #27864 from prakharjain09/SPARK-20732-rddcache-1.
    
    Authored-by: Prakhar Jain <prakharjai...@gmail.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../org/apache/spark/internal/config/package.scala |  28 +++++
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  17 ++-
 .../org/apache/spark/storage/BlockManager.scala    | 129 +++++++++++++++++++--
 .../apache/spark/storage/BlockManagerMaster.scala  |  10 ++
 .../spark/storage/BlockManagerMasterEndpoint.scala |  50 +++++++-
 .../spark/storage/BlockManagerMessages.scala       |   7 ++
 .../spark/storage/BlockManagerSlaveEndpoint.scala  |   3 +
 .../storage/BlockManagerDecommissionSuite.scala    | 104 +++++++++++++++++
 .../apache/spark/storage/BlockManagerSuite.scala   |  58 +++++++++
 9 files changed, 394 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 5006da0..1bc2734 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -413,6 +413,34 @@ package object config {
       .intConf
       .createWithDefault(1)
 
+  private[spark] val STORAGE_DECOMMISSION_ENABLED =
+    ConfigBuilder("spark.storage.decommission.enabled")
+      .doc("Whether to decommission the block manager when decommissioning 
executor")
+      .version("3.1.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK =
+    ConfigBuilder("spark.storage.decommission.maxReplicationFailuresPerBlock")
+      .internal()
+      .doc("Maximum number of failures which can be handled for the 
replication of " +
+        "one RDD block when block manager is decommissioning and trying to 
move its " +
+        "existing blocks.")
+      .version("3.1.0")
+      .intConf
+      .createWithDefault(3)
+
+  private[spark] val STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL =
+    ConfigBuilder("spark.storage.decommission.replicationReattemptInterval")
+      .internal()
+      .doc("The interval of time between consecutive cache block replication 
reattempts " +
+        "happening on each decommissioning executor (due to storage 
decommissioning).")
+      .version("3.1.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(_ > 0, "Time interval between two consecutive attempts of " +
+        "cache block replication should be positive.")
+      .createWithDefaultString("30s")
+
   private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
     ConfigBuilder("spark.storage.replication.topologyFile")
       .version("2.1.0")
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 701d69b..67638a5 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -438,6 +438,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
             logError(s"Unexpected error during decommissioning ${e.toString}", 
e)
         }
         logInfo(s"Finished decommissioning executor $executorId.")
+
+        if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+          try {
+            logInfo("Starting decommissioning block manager corresponding to " 
+
+              s"executor $executorId.")
+            
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
+          } catch {
+            case e: Exception =>
+              logError("Unexpected error during block manager " +
+                s"decommissioning for executor $executorId: ${e.toString}", e)
+          }
+          logInfo(s"Acknowledged decommissioning block manager corresponding 
to $executorId.")
+        }
       } else {
         logInfo(s"Skipping decommissioning of executor $executorId.")
       }
@@ -574,7 +587,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
    */
   private[spark] def decommissionExecutor(executorId: String): Unit = {
     if (driverEndpoint != null) {
-      logInfo("Propegating executor decommission to driver.")
+      logInfo("Propagating executor decommission to driver.")
       driverEndpoint.send(DecommissionExecutor(executorId))
     }
   }
@@ -658,7 +671,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Update the cluster manager on our scheduling needs. Three bits of 
information are included
    * to help it make decisions.
-   * @param resourceProfileToNumExecutors The total number of executors we'd 
like to have per
+   * @param resourceProfileIdToNumExecutors The total number of executors we'd 
like to have per
    *                                      ResourceProfile. The cluster manager 
shouldn't kill any
    *                                      running executor to reach this 
number, but, if all
    *                                      existing executors were to die, this 
is the number
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index e7f8de5..aa15d12 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.ExecutorCacheTaskLocation
 import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
 import org.apache.spark.shuffle.{ShuffleManager, ShuffleWriteMetricsReporter}
+import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock
 import org.apache.spark.storage.memory._
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util._
@@ -241,6 +242,9 @@ private[spark] class BlockManager(
 
   private var blockReplicationPolicy: BlockReplicationPolicy = _
 
+  private var blockManagerDecommissioning: Boolean = false
+  private var decommissionManager: Option[BlockManagerDecommissionManager] = 
None
+
   // A DownloadFileManager used to track all the files of remote blocks which 
are above the
   // specified memory threshold. Files will be deleted automatically based on 
weak reference.
   // Exposed for test
@@ -1551,18 +1555,22 @@ private[spark] class BlockManager(
   }
 
   /**
-   * Called for pro-active replenishment of blocks lost due to executor 
failures
+   * Replicates a block to peer block managers based on existingReplicas and 
maxReplicas
    *
    * @param blockId blockId being replicate
    * @param existingReplicas existing block managers that have a replica
    * @param maxReplicas maximum replicas needed
+   * @param maxReplicationFailures number of replication failures to tolerate 
before
+   *                               giving up.
+   * @return whether block was successfully replicated or not
    */
   def replicateBlock(
       blockId: BlockId,
       existingReplicas: Set[BlockManagerId],
-      maxReplicas: Int): Unit = {
+      maxReplicas: Int,
+      maxReplicationFailures: Option[Int] = None): Boolean = {
     logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
-    blockInfoManager.lockForReading(blockId).foreach { info =>
+    blockInfoManager.lockForReading(blockId).forall { info =>
       val data = doGetLocalBytes(blockId, info)
       val storageLevel = StorageLevel(
         useDisk = info.level.useDisk,
@@ -1570,11 +1578,13 @@ private[spark] class BlockManager(
         useOffHeap = info.level.useOffHeap,
         deserialized = info.level.deserialized,
         replication = maxReplicas)
-      // we know we are called as a result of an executor removal, so we 
refresh peer cache
-      // this way, we won't try to replicate to a missing executor with a 
stale reference
+      // we know we are called as a result of an executor removal or because 
the current executor
+      // is getting decommissioned. so we refresh peer cache before trying 
replication, we won't
+      // try to replicate to a missing executor/another decommissioning 
executor
       getPeers(forceFetch = true)
       try {
-        replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
+        replicate(
+          blockId, data, storageLevel, info.classTag, existingReplicas, 
maxReplicationFailures)
       } finally {
         logDebug(s"Releasing lock for $blockId")
         releaseLockAndDispose(blockId, data)
@@ -1591,9 +1601,11 @@ private[spark] class BlockManager(
       data: BlockData,
       level: StorageLevel,
       classTag: ClassTag[_],
-      existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
+      existingReplicas: Set[BlockManagerId] = Set.empty,
+      maxReplicationFailures: Option[Int] = None): Boolean = {
 
-    val maxReplicationFailures = 
conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
+    val maxReplicationFailureCount = maxReplicationFailures.getOrElse(
+      conf.get(config.STORAGE_MAX_REPLICATION_FAILURE))
     val tLevel = StorageLevel(
       useDisk = level.useDisk,
       useMemory = level.useMemory,
@@ -1617,7 +1629,7 @@ private[spark] class BlockManager(
       blockId,
       numPeersToReplicateTo)
 
-    while(numFailures <= maxReplicationFailures &&
+    while(numFailures <= maxReplicationFailureCount &&
       !peersForReplication.isEmpty &&
       peersReplicatedTo.size < numPeersToReplicateTo) {
       val peer = peersForReplication.head
@@ -1665,9 +1677,11 @@ private[spark] class BlockManager(
     if (peersReplicatedTo.size < numPeersToReplicateTo) {
       logWarning(s"Block $blockId replicated to only " +
         s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo 
peers")
+      return false
     }
 
     logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", 
")}")
+    return true
   }
 
   /**
@@ -1761,6 +1775,58 @@ private[spark] class BlockManager(
     blocksToRemove.size
   }
 
+  def decommissionBlockManager(): Unit = {
+    if (!blockManagerDecommissioning) {
+      logInfo("Starting block manager decommissioning process")
+      blockManagerDecommissioning = true
+      decommissionManager = Some(new BlockManagerDecommissionManager(conf))
+      decommissionManager.foreach(_.start())
+    } else {
+      logDebug("Block manager already in decommissioning state")
+    }
+  }
+
+  /**
+   * Tries to offload all cached RDD blocks from this BlockManager to peer 
BlockManagers
+   * Visible for testing
+   */
+  def decommissionRddCacheBlocks(): Unit = {
+    val replicateBlocksInfo = 
master.getReplicateInfoForRDDBlocks(blockManagerId)
+
+    if (replicateBlocksInfo.nonEmpty) {
+      logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
+        "for block manager decommissioning")
+    }
+
+    // Maximum number of storage replication failure which replicateBlock can 
handle
+    val maxReplicationFailures = conf.get(
+      config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+    // TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
+    //   so that we end up prioritize them over each other
+    val blocksFailedReplication = ThreadUtils.parmap(
+      replicateBlocksInfo, "decommissionRddCacheBlocks", 4) {
+      case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
+        val replicatedSuccessfully = replicateBlock(
+          blockId,
+          existingReplicas.toSet,
+          maxReplicas,
+          maxReplicationFailures = Some(maxReplicationFailures))
+        if (replicatedSuccessfully) {
+          logInfo(s"Block $blockId offloaded successfully, Removing block now")
+          removeBlock(blockId)
+          logInfo(s"Block $blockId removed")
+        } else {
+          logWarning(s"Failed to offload block $blockId")
+        }
+        (blockId, replicatedSuccessfully)
+    }.filterNot(_._2).map(_._1)
+    if (blocksFailedReplication.nonEmpty) {
+      logWarning("Blocks failed replication in cache decommissioning " +
+        s"process: ${blocksFailedReplication.mkString(",")}")
+    }
+  }
+
   /**
    * Remove all blocks belonging to the given broadcast.
    */
@@ -1829,7 +1895,52 @@ private[spark] class BlockManager(
     data.dispose()
   }
 
+  /**
+   * Class to handle block manager decommissioning retries
+   * It creates a Thread to retry offloading all RDD cache blocks
+   */
+  private class BlockManagerDecommissionManager(conf: SparkConf) {
+    @volatile private var stopped = false
+    private val blockReplicationThread = new Thread {
+      override def run(): Unit = {
+        while (blockManagerDecommissioning && !stopped) {
+          try {
+            logDebug("Attempting to replicate all cached RDD blocks")
+            decommissionRddCacheBlocks()
+            logInfo("Attempt to replicate all cached blocks done")
+            val sleepInterval = conf.get(
+              config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
+            Thread.sleep(sleepInterval)
+          } catch {
+            case _: InterruptedException =>
+              // no-op
+            case NonFatal(e) =>
+              logError("Error occurred while trying to " +
+                "replicate cached RDD blocks for block manager 
decommissioning", e)
+          }
+        }
+      }
+    }
+    blockReplicationThread.setDaemon(true)
+    blockReplicationThread.setName("block-replication-thread")
+
+    def start(): Unit = {
+      logInfo("Starting block replication thread")
+      blockReplicationThread.start()
+    }
+
+    def stop(): Unit = {
+      if (!stopped) {
+        stopped = true
+        logInfo("Stopping block replication thread")
+        blockReplicationThread.interrupt()
+        blockReplicationThread.join()
+      }
+    }
+  }
+
   def stop(): Unit = {
+    decommissionManager.foreach(_.stop())
     blockTransferService.close()
     if (blockStoreClient ne blockTransferService) {
       // Closing should be idempotent, but maybe not for the 
NioBlockTransferService.
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e440c1a..3cfa5d2 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -43,6 +43,16 @@ class BlockManagerMaster(
     logInfo("Removed " + execId + " successfully in removeExecutor")
   }
 
+  /** Decommission block managers corresponding to given set of executors */
+  def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
+    driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
+  }
+
+  /** Get Replication Info for all the RDD blocks stored in given 
blockManagerId */
+  def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): 
Seq[ReplicateBlock] = {
+    
driverEndpoint.askSync[Seq[ReplicateBlock]](GetReplicateInfoForRDDBlocks(blockManagerId))
+  }
+
   /** Request removal of a dead executor from the driver endpoint.
    *  This is only called on the driver side. Non-blocking
    */
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index d7f7eed..d936420 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -65,6 +65,9 @@ class BlockManagerMasterEndpoint(
   // Mapping from executor ID to block manager ID.
   private val blockManagerIdByExecutor = new mutable.HashMap[String, 
BlockManagerId]
 
+  // Set of block managers which are decommissioning
+  private val decommissioningBlockManagerSet = new 
mutable.HashSet[BlockManagerId]
+
   // Mapping from block id to the set of block managers that have the block.
   private val blockLocations = new JHashMap[BlockId, 
mutable.HashSet[BlockManagerId]]
 
@@ -153,6 +156,13 @@ class BlockManagerMasterEndpoint(
       removeExecutor(execId)
       context.reply(true)
 
+    case DecommissionBlockManagers(executorIds) =>
+      
decommissionBlockManagers(executorIds.flatMap(blockManagerIdByExecutor.get))
+      context.reply(true)
+
+    case GetReplicateInfoForRDDBlocks(blockManagerId) =>
+      context.reply(getReplicateInfoForRDDBlocks(blockManagerId))
+
     case StopBlockManagerMaster =>
       context.reply(true)
       stop()
@@ -257,6 +267,7 @@ class BlockManagerMasterEndpoint(
 
     // Remove the block manager from blockManagerIdByExecutor.
     blockManagerIdByExecutor -= blockManagerId.executorId
+    decommissioningBlockManagerSet.remove(blockManagerId)
 
     // Remove it from blockManagerInfo and remove all the blocks.
     blockManagerInfo.remove(blockManagerId)
@@ -299,6 +310,39 @@ class BlockManagerMasterEndpoint(
     blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
   }
 
+  /**
+   * Decommission the given Seq of blockmanagers
+   *    - Adds these block managers to decommissioningBlockManagerSet Set
+   *    - Sends the DecommissionBlockManager message to each of the 
[[BlockManagerSlaveEndpoint]]
+   */
+  def decommissionBlockManagers(blockManagerIds: Seq[BlockManagerId]): 
Future[Seq[Unit]] = {
+    val newBlockManagersToDecommission = 
blockManagerIds.toSet.diff(decommissioningBlockManagerSet)
+    val futures = newBlockManagersToDecommission.map { blockManagerId =>
+      decommissioningBlockManagerSet.add(blockManagerId)
+      val info = blockManagerInfo(blockManagerId)
+      info.slaveEndpoint.ask[Unit](DecommissionBlockManager)
+    }
+    Future.sequence{ futures.toSeq }
+  }
+
+  /**
+   * Returns a Seq of ReplicateBlock for each RDD block stored by given 
blockManagerId
+   * @param blockManagerId - block manager id for which ReplicateBlock info is 
needed
+   * @return Seq of ReplicateBlock
+   */
+  private def getReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId): 
Seq[ReplicateBlock] = {
+    val info = blockManagerInfo(blockManagerId)
+
+    val rddBlocks = info.blocks.keySet().asScala.filter(_.isRDD)
+    rddBlocks.map { blockId =>
+      val currentBlockLocations = blockLocations.get(blockId)
+      val maxReplicas = currentBlockLocations.size + 1
+      val remainingLocations = currentBlockLocations.toSeq.filter(bm => bm != 
blockManagerId)
+      val replicateMsg = ReplicateBlock(blockId, remainingLocations, 
maxReplicas)
+      replicateMsg
+    }.toSeq
+  }
+
   // Remove a block from the slaves that have it. This can only be used to 
remove
   // blocks that the master knows about.
   private def removeBlockFromWorkers(blockId: BlockId): Unit = {
@@ -536,7 +580,11 @@ class BlockManagerMasterEndpoint(
   private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
     val blockManagerIds = blockManagerInfo.keySet
     if (blockManagerIds.contains(blockManagerId)) {
-      blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId 
}.toSeq
+      blockManagerIds
+        .filterNot { _.isDriver }
+        .filterNot { _ == blockManagerId }
+        .diff(decommissioningBlockManagerSet)
+        .toSeq
     } else {
       Seq.empty
     }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 895f48d..7d4f2ff 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -36,6 +36,8 @@ private[spark] object BlockManagerMessages {
   case class ReplicateBlock(blockId: BlockId, replicas: Seq[BlockManagerId], 
maxReplicas: Int)
     extends ToBlockManagerSlave
 
+  case object DecommissionBlockManager extends ToBlockManagerSlave
+
   // Remove all blocks belonging to a specific RDD.
   case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
 
@@ -125,6 +127,11 @@ private[spark] object BlockManagerMessages {
 
   case object GetStorageStatus extends ToBlockManagerMaster
 
+  case class DecommissionBlockManagers(executorIds: Seq[String]) extends 
ToBlockManagerMaster
+
+  case class GetReplicateInfoForRDDBlocks(blockManagerId: BlockManagerId)
+    extends ToBlockManagerMaster
+
   case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)
     extends ToBlockManagerMaster
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
index 29e2114..a3a7149 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala
@@ -61,6 +61,9 @@ class BlockManagerSlaveEndpoint(
         SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
       }
 
+    case DecommissionBlockManager =>
+      context.reply(blockManager.decommissionBlockManager())
+
     case RemoveBroadcast(broadcastId, _) =>
       doAsync[Int]("removing broadcast " + broadcastId, context) {
         blockManager.removeBroadcast(broadcastId, tellMaster = true)
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
new file mode 100644
index 0000000..59fb056
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.util.concurrent.Semaphore
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite, Success}
+import org.apache.spark.internal.config
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, 
SparkListenerTaskStart}
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+import org.apache.spark.util.ThreadUtils
+
+class BlockManagerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
+
+  override def beforeEach(): Unit = {
+    val conf = new SparkConf().setAppName("test").setMaster("local")
+      .set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
+      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 100L)
+      .set(config.STORAGE_DECOMMISSION_ENABLED, true)
+
+    sc = new SparkContext("local-cluster[3, 1, 1024]", "test", conf)
+  }
+
+  test(s"verify that an already running task which is going to cache data 
succeeds " +
+    s"on a decommissioned executor") {
+    // Create input RDD with 10 partitions
+    val input = sc.parallelize(1 to 10, 10)
+    val accum = sc.longAccumulator("mapperRunAccumulator")
+    // Do a count to wait for the executors to be registered.
+    input.count()
+
+    // Create a new RDD where we have sleep in each partition, we are also 
increasing
+    // the value of accumulator in each partition
+    val sleepyRdd = input.mapPartitions { x =>
+      Thread.sleep(500)
+      accum.add(1)
+      x
+    }
+
+    // Listen for the job
+    val sem = new Semaphore(0)
+    val taskEndEvents = ArrayBuffer.empty[SparkListenerTaskEnd]
+    sc.addSparkListener(new SparkListener {
+      override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+       sem.release()
+      }
+
+      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+        taskEndEvents.append(taskEnd)
+      }
+    })
+
+    // Cache the RDD lazily
+    sleepyRdd.persist()
+
+    // Start the computation of RDD - this step will also cache the RDD
+    val asyncCount = sleepyRdd.countAsync()
+
+    // Wait for the job to have started
+    sem.acquire(1)
+
+    // Decommission one of the executor
+    val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+    val execs = sched.getExecutorIds()
+    assert(execs.size == 3, s"Expected 3 executors but found ${execs.size}")
+    val execToDecommission = execs.head
+    sched.decommissionExecutor(execToDecommission)
+
+    // Wait for job to finish
+    val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 3.seconds)
+    assert(asyncCountResult === 10)
+    // All 10 tasks finished, so accum should have been increased 10 times
+    assert(accum.value === 10)
+
+    // All tasks should be successful, nothing should have failed
+    sc.listenerBus.waitUntilEmpty()
+    assert(taskEndEvents.size === 10) // 10 mappers
+    assert(taskEndEvents.map(_.reason).toSet === Set(Success))
+
+    // Since the RDD is cached, so further usage of same RDD should use the
+    // cached data. Original RDD partitions should not be recomputed i.e. accum
+    // should have same value like before
+    assert(sleepyRdd.count() === 10)
+    assert(accum.value === 10)
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 8d06768..eb875dc 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -1706,6 +1706,64 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     verify(liveListenerBus, 
never()).post(SparkListenerBlockUpdated(BlockUpdatedInfo(updateInfo)))
   }
 
+  test("test decommission block manager should not be part of peers") {
+    val exec1 = "exec1"
+    val exec2 = "exec2"
+    val exec3 = "exec3"
+    val store1 = makeBlockManager(2000, exec1)
+    val store2 = makeBlockManager(2000, exec2)
+    val store3 = makeBlockManager(2000, exec3)
+
+    assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === 
Set(exec1, exec2))
+
+    val data = new Array[Byte](400)
+    val blockId = rdd(0, 0)
+    store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2)
+    assert(master.getLocations(blockId).size === 2)
+
+    master.decommissionBlockManagers(Seq(exec1))
+    // store1 is decommissioned, so it should not be part of peer list for 
store3
+    assert(master.getPeers(store3.blockManagerId).map(_.executorId).toSet === 
Set(exec2))
+  }
+
+  test("test decommissionRddCacheBlocks should offload all cached blocks") {
+    val store1 = makeBlockManager(2000, "exec1")
+    val store2 = makeBlockManager(2000, "exec2")
+    val store3 = makeBlockManager(2000, "exec3")
+
+    val data = new Array[Byte](400)
+    val blockId = rdd(0, 0)
+    store1.putSingle(blockId, data, StorageLevel.MEMORY_ONLY_2)
+    assert(master.getLocations(blockId).size === 2)
+    assert(master.getLocations(blockId).contains(store1.blockManagerId))
+
+    store1.decommissionRddCacheBlocks()
+    assert(master.getLocations(blockId).size === 2)
+    assert(master.getLocations(blockId).toSet === Set(store2.blockManagerId,
+      store3.blockManagerId))
+  }
+
+  test("test decommissionRddCacheBlocks should keep the block if it is not 
able to offload") {
+    val store1 = makeBlockManager(12000, "exec1")
+    val store2 = makeBlockManager(2000, "exec2")
+
+    val dataLarge = new Array[Byte](5000)
+    val blockIdLarge = rdd(0, 0)
+    val dataSmall = new Array[Byte](500)
+    val blockIdSmall = rdd(0, 1)
+
+    store1.putSingle(blockIdLarge, dataLarge, StorageLevel.MEMORY_ONLY)
+    store1.putSingle(blockIdSmall, dataSmall, StorageLevel.MEMORY_ONLY)
+    assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
+    assert(master.getLocations(blockIdSmall) === Seq(store1.blockManagerId))
+
+    store1.decommissionRddCacheBlocks()
+    // Smaller block offloaded to store2
+    assert(master.getLocations(blockIdSmall) === Seq(store2.blockManagerId))
+    // Larger block still present in store1 as it can't be offloaded
+    assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
+  }
+
   class MockBlockTransferService(val maxFailures: Int) extends 
BlockTransferService {
     var numCalls = 0
     var tempFileManager: DownloadFileManager = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to