This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 375d348  [SPARK-31197][CORE] Shutdown executor once we are done 
decommissioning
375d348 is described below

commit 375d348a83e6ffa38dfaece5047633f67aee1da5
Author: Holden Karau <hka...@apple.com>
AuthorDate: Wed Aug 5 16:28:14 2020 -0700

    [SPARK-31197][CORE] Shutdown executor once we are done decommissioning
    
    ### What changes were proposed in this pull request?
    
    Exit the executor when it has been asked to decommission and there is 
nothing left for it to do.
    
    This is a rebase of https://github.com/apache/spark/pull/28817
    
    ### Why are the changes needed?
    
    If we want to use decommissioning in Spark's own scale down we should 
terminate the executor once finished.
    Furthermore, in graceful shutdown it makes sense to release resources we no 
longer need if we've been asked to shutdown by the cluster manager instead of 
always holding the resources as long as possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    The decommissioned executors will exit and the end of decommissioning. This 
is sort of a user facing change, however decommissioning hasn't been in any 
releases yet.
    
    ### How was this patch tested?
    
    I changed the unit test to not send the executor exit message and still 
wait on the executor exited message.
    
    Closes #29211 from holdenk/SPARK-31197-exit-execs-redone.
    
    Authored-by: Holden Karau <hka...@apple.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../org/apache/spark/deploy/DeployMessage.scala    |   2 -
 .../org/apache/spark/deploy/worker/Worker.scala    |   2 +-
 .../executor/CoarseGrainedExecutorBackend.scala    |  58 ++++++++-
 .../cluster/CoarseGrainedClusterMessage.scala      |   3 +
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  10 ++
 .../org/apache/spark/storage/BlockManager.scala    |   8 ++
 .../spark/storage/BlockManagerDecommissioner.scala |  96 +++++++++++---
 .../spark/scheduler/WorkerDecommissionSuite.scala  |  19 ++-
 .../BlockManagerDecommissionIntegrationSuite.scala |  17 ++-
 .../BlockManagerDecommissionUnitSuite.scala        | 139 ++++++++++++++++++++-
 10 files changed, 310 insertions(+), 44 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c8c6e5a..b7a64d75 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -165,8 +165,6 @@ private[deploy] object DeployMessages {
 
   case object ReregisterWithMaster // used when a worker attempts to reconnect 
to a master
 
-  case object DecommissionSelf // Mark as decommissioned. May be Master to 
Worker in the future.
-
   // AppClient to Master
 
   case class RegisterApplication(appDescription: ApplicationDescription, 
driver: RpcEndpointRef)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index aa8c46f..862e685 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -668,7 +668,7 @@ private[deploy] class Worker(
       finishedApps += id
       maybeCleanupApplication(id)
 
-    case DecommissionSelf =>
+    case WorkerDecommission(_, _) =>
       decommissionSelf()
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index def125b..55fb76b 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -64,7 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   private[this] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
-  @volatile private var decommissioned = false
   @volatile var driver: Option[RpcEndpointRef] = None
 
   // If this CoarseGrainedExecutorBackend is changed to support multiple 
threads, then this may need
@@ -80,6 +79,8 @@ private[spark] class CoarseGrainedExecutorBackend(
    */
   private[executor] val taskResources = new mutable.HashMap[Long, Map[String, 
ResourceInformation]]
 
+  @volatile private var decommissioned = false
+
   override def onStart(): Unit = {
     logInfo("Registering PWR handler.")
     SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
@@ -214,6 +215,10 @@ private[spark] class CoarseGrainedExecutorBackend(
     case UpdateDelegationTokens(tokenBytes) =>
       logInfo(s"Received tokens of ${tokenBytes.length} bytes")
       SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
+
+    case DecommissionSelf =>
+      logInfo("Received decommission self")
+      decommissionSelf()
   }
 
   override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -277,12 +282,59 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor != null) {
         executor.decommission()
       }
-      logInfo("Done decommissioning self.")
+      // Shutdown the executor once all tasks are gone & any configured 
migrations completed.
+      // Detecting migrations completion doesn't need to be perfect and we 
want to minimize the
+      // overhead for executors that are not in decommissioning state as 
overall that will be
+      // more of the executors. For example, this will not catch a block which 
is already in
+      // the process of being put from a remote executor before migration 
starts. This trade-off
+      // is viewed as acceptable to minimize introduction of any new locking 
structures in critical
+      // code paths.
+
+      val shutdownThread = new Thread("wait-for-blocks-to-migrate") {
+        override def run(): Unit = {
+          var lastTaskRunningTime = System.nanoTime()
+          val sleep_time = 1000 // 1s
+
+          while (true) {
+            logInfo("Checking to see if we can shutdown.")
+            Thread.sleep(sleep_time)
+            if (executor == null || executor.numRunningTasks == 0) {
+              if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+                logInfo("No running tasks, checking migrations")
+                val (migrationTime, allBlocksMigrated) = 
env.blockManager.lastMigrationInfo()
+                // We can only trust allBlocksMigrated boolean value if there 
were no tasks running
+                // since the start of computing it.
+                if (allBlocksMigrated && (migrationTime > 
lastTaskRunningTime)) {
+                  logInfo("No running tasks, all blocks migrated, stopping.")
+                  exitExecutor(0, "Finished decommissioning", notifyDriver = 
true)
+                } else {
+                  logInfo("All blocks not yet migrated.")
+                }
+              } else {
+                logInfo("No running tasks, no block migration configured, 
stopping.")
+                exitExecutor(0, "Finished decommissioning", notifyDriver = 
true)
+              }
+            } else {
+              logInfo("Blocked from shutdown by running 
${executor.numRunningtasks} tasks")
+              // If there is a running task it could store blocks, so make 
sure we wait for a
+              // migration loop to complete after the last task is done.
+              // Note: this is only advanced if there is a running task, if 
there
+              // is no running task but the blocks are not done migrating this 
does not
+              // move forward.
+              lastTaskRunningTime = System.nanoTime()
+            }
+          }
+        }
+      }
+      shutdownThread.setDaemon(true)
+      shutdownThread.start()
+
+      logInfo("Will exit when finished decommissioning")
       // Return true since we are handling a signal
       true
     } catch {
       case e: Exception =>
-        logError(s"Error ${e} during attempt to decommission self")
+        logError("Unexpected error while decommissioning self", e)
         false
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 91485f0..7242ab7 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {
 
   // The message to check if `CoarseGrainedSchedulerBackend` thinks the 
executor is alive or not.
   case class IsExecutorAlive(executorId: String) extends 
CoarseGrainedClusterMessage
+
+  // Used to ask an executor to decommission itself. (Can be an internal 
message)
+  case object DecommissionSelf extends CoarseGrainedClusterMessage
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 8fbefae..d81a617 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -442,6 +442,16 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           case e: Exception =>
             logError(s"Unexpected error during decommissioning ${e.toString}", 
e)
         }
+        // Send decommission message to the executor, this may be a duplicate 
since the executor
+        // could have been the one to notify us. But it's also possible the 
notification came from
+        // elsewhere and the executor does not yet know.
+        executorDataMap.get(executorId) match {
+          case Some(executorInfo) =>
+            executorInfo.executorEndpoint.send(DecommissionSelf)
+          case None =>
+            // Ignoring the executor since it is not registered.
+            logWarning(s"Attempted to decommission unknown executor 
$executorId.")
+        }
         logInfo(s"Finished decommissioning executor $executorId.")
 
         if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 47af854..6ec93df 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
     }
   }
 
+  /*
+   *  Returns the last migration time and a boolean denoting if all the blocks 
have been migrated.
+   *  If there are any tasks running since that time the boolean may be 
incorrect.
+   */
+  private[spark] def lastMigrationInfo(): (Long, Boolean) = {
+    decommissioner.map(_.lastMigrationInfo()).getOrElse((0, false))
+  }
+
   private[storage] def getMigratableRDDBlocks(): Seq[ReplicateBlock] =
     master.getReplicateInfoForRDDBlocks(blockManagerId)
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 1cc7ef6..f0a8e47 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.storage
 
 import java.util.concurrent.ExecutorService
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
   private val maxReplicationFailuresForDecommission =
     conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
 
+  // Used for tracking if our migrations are complete. Readable for testing
+  @volatile private[storage] var lastRDDMigrationTime: Long = 0
+  @volatile private[storage] var lastShuffleMigrationTime: Long = 0
+  @volatile private[storage] var rddBlocksLeft: Boolean = true
+  @volatile private[storage] var shuffleBlocksLeft: Boolean = true
+
   /**
    * This runnable consumes any shuffle blocks in the queue for migration. 
This part of a
    * producer/consumer where the main migration loop updates the queue of 
blocks to be migrated
@@ -91,10 +98,11 @@ private[storage] class BlockManagerDecommissioner(
                     null)// class tag, we don't need for shuffle
                   logDebug(s"Migrated sub block ${blockId}")
                 }
-                logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+                logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}")
               } else {
                 logError(s"Skipping block ${shuffleBlockInfo} because it has 
failed ${retryCount}")
               }
+              numMigratedShuffles.incrementAndGet()
           }
         }
         // This catch is intentionally outside of the while running block.
@@ -115,12 +123,21 @@ private[storage] class BlockManagerDecommissioner(
   // Shuffles which are either in queue for migrations or migrated
   private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()
 
+  // Shuffles which have migrated. This used to know when we are "done", being 
done can change
+  // if a new shuffle file is created by a running task.
+  private val numMigratedShuffles = new AtomicInteger(0)
+
   // Shuffles which are queued for migration & number of retries so far.
+  // Visible in storage for testing.
   private[storage] val shufflesToMigrate =
     new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()
 
   // Set if we encounter an error attempting to migrate and stop.
   @volatile private var stopped = false
+  @volatile private var stoppedRDD =
+    !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
+  @volatile private var stoppedShuffle =
+    !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
 
   private val migrationPeers =
     mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
@@ -133,22 +150,31 @@ private[storage] class BlockManagerDecommissioner(
 
     override def run(): Unit = {
       assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
-      while (!stopped && !Thread.interrupted()) {
+      while (!stopped && !stoppedRDD && !Thread.interrupted()) {
         logInfo("Iterating on migrating from the block manager.")
+        // Validate we have peers to migrate to.
+        val peers = bm.getPeers(false)
+        // If we have no peers give up.
+        if (peers.isEmpty) {
+          stopped = true
+          stoppedRDD = true
+        }
         try {
+          val startTime = System.nanoTime()
           logDebug("Attempting to replicate all cached RDD blocks")
-          decommissionRddCacheBlocks()
+          rddBlocksLeft = decommissionRddCacheBlocks()
+          lastRDDMigrationTime = startTime
           logInfo("Attempt to replicate all cached blocks done")
           logInfo(s"Waiting for ${sleepInterval} before refreshing 
migrations.")
           Thread.sleep(sleepInterval)
         } catch {
           case e: InterruptedException =>
-            logInfo("Interrupted during migration, will not refresh 
migrations.")
-            stopped = true
+            logInfo("Interrupted during RDD migration, stopping")
+            stoppedRDD = true
           case NonFatal(e) =>
-            logError("Error occurred while trying to replicate for block 
manager decommissioning.",
+            logError("Error occurred replicating RDD for block manager 
decommissioning.",
               e)
-            stopped = true
+            stoppedRDD = true
         }
       }
     }
@@ -162,20 +188,22 @@ private[storage] class BlockManagerDecommissioner(
 
     override def run() {
       assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
-      while (!stopped && !Thread.interrupted()) {
+      while (!stopped && !stoppedShuffle && !Thread.interrupted()) {
         try {
           logDebug("Attempting to replicate all shuffle blocks")
-          refreshOffloadingShuffleBlocks()
+          val startTime = System.nanoTime()
+          shuffleBlocksLeft = refreshOffloadingShuffleBlocks()
+          lastShuffleMigrationTime = startTime
           logInfo("Done starting workers to migrate shuffle blocks")
           Thread.sleep(sleepInterval)
         } catch {
           case e: InterruptedException =>
             logInfo("Interrupted during migration, will not refresh 
migrations.")
-            stopped = true
+            stoppedShuffle = true
           case NonFatal(e) =>
             logError("Error occurred while trying to replicate for block 
manager decommissioning.",
               e)
-            stopped = true
+            stoppedShuffle = true
         }
       }
     }
@@ -191,8 +219,9 @@ private[storage] class BlockManagerDecommissioner(
    * but rather shadows them.
    * Requires an Indexed based shuffle resolver.
    * Note: if called in testing please call stopOffloadingShuffleBlocks to 
avoid thread leakage.
+   * Returns true if we are not done migrating shuffle blocks.
    */
-  private[storage] def refreshOffloadingShuffleBlocks(): Unit = {
+  private[storage] def refreshOffloadingShuffleBlocks(): Boolean = {
     // Update the queue of shuffles to be migrated
     logInfo("Offloading shuffle blocks")
     val localShuffles = bm.migratableResolver.getStoredShuffles().toSet
@@ -215,6 +244,12 @@ private[storage] class BlockManagerDecommissioner(
     deadPeers.foreach { peer =>
         migrationPeers.get(peer).foreach(_.running = false)
     }
+    // If we don't have anyone to migrate to give up
+    if (migrationPeers.values.find(_.running == true).isEmpty) {
+      stoppedShuffle = true
+    }
+    // If we found any new shuffles to migrate or otherwise have not migrated 
everything.
+    newShufflesToMigrate.nonEmpty || migratingShuffles.size < 
numMigratedShuffles.get()
   }
 
   /**
@@ -231,16 +266,18 @@ private[storage] class BlockManagerDecommissioner(
   /**
    * Tries to offload all cached RDD blocks from this BlockManager to peer 
BlockManagers
    * Visible for testing
+   * Returns true if we have not migrated all of our RDD blocks.
    */
-  private[storage] def decommissionRddCacheBlocks(): Unit = {
+  private[storage] def decommissionRddCacheBlocks(): Boolean = {
     val replicateBlocksInfo = bm.getMigratableRDDBlocks()
+    // Refresh peers and validate we have somewhere to move blocks.
 
     if (replicateBlocksInfo.nonEmpty) {
       logInfo(s"Need to replicate ${replicateBlocksInfo.size} RDD blocks " +
         "for block manager decommissioning")
     } else {
       logWarning(s"Asked to decommission RDD cache blocks, but no blocks to 
migrate")
-      return
+      return false
     }
 
     // TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
@@ -252,7 +289,9 @@ private[storage] class BlockManagerDecommissioner(
     if (blocksFailedReplication.nonEmpty) {
       logWarning("Blocks failed replication in cache decommissioning " +
         s"process: ${blocksFailedReplication.mkString(",")}")
+      return true
     }
+    return false
   }
 
   private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = {
@@ -327,4 +366,33 @@ private[storage] class BlockManagerDecommissioner(
     }
     logInfo("Stopped storage decommissioner")
   }
+
+  /*
+   *  Returns the last migration time and a boolean for if all blocks have 
been migrated.
+   *  The last migration time is calculated to be the minimum of the last 
migration of any
+   *  running migration (and if there are now current running migrations it is 
set to current).
+   *  This provides a timeStamp which, if there have been no tasks running 
since that time
+   *  we can know that all potential blocks that can be have been migrated off.
+   */
+  private[storage] def lastMigrationInfo(): (Long, Boolean) = {
+    if (stopped || (stoppedRDD && stoppedShuffle)) {
+      // Since we don't have anything left to migrate ever (since we don't 
restart once
+      // stopped), return that we're done with a validity timestamp that 
doesn't expire.
+      (Long.MaxValue, true)
+    } else {
+      // Chose the min of the active times. See the function description for 
more information.
+      val lastMigrationTime = if (!stoppedRDD && !stoppedShuffle) {
+        Math.min(lastRDDMigrationTime, lastShuffleMigrationTime)
+      } else if (!stoppedShuffle) {
+        lastShuffleMigrationTime
+      } else {
+        lastRDDMigrationTime
+      }
+
+      // Technically we could have blocks left if we encountered an error, but 
those blocks will
+      // never be migrated, so we don't care about them.
+      val blocksMigrated = (!shuffleBlocksLeft || stoppedShuffle) && 
(!rddBlocksLeft || stoppedRDD)
+      (lastMigrationTime, blocksMigrated)
+    }
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
index 3c34070..bb0c33a 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -47,7 +47,12 @@ class WorkerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
     assert(sleepyRdd.count() === 10)
   }
 
-  test("verify a task with all workers decommissioned succeeds") {
+  test("verify a running task with all workers decommissioned succeeds") {
+    // Wait for the executors to come up
+    TestUtils.waitUntilExecutorsUp(sc = sc,
+      numExecutors = 2,
+      timeout = 30000) // 30s
+
     val input = sc.parallelize(1 to 10)
     // Listen for the job
     val sem = new Semaphore(0)
@@ -56,9 +61,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
         sem.release()
       }
     })
-    TestUtils.waitUntilExecutorsUp(sc = sc,
-      numExecutors = 2,
-      timeout = 30000) // 30s
+
     val sleepyRdd = input.mapPartitions{ x =>
       Thread.sleep(5000) // 5s
       x
@@ -76,13 +79,5 @@ class WorkerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
     execs.foreach(execId => sched.decommissionExecutor(execId, 
ExecutorDecommissionInfo("", false)))
     val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
     assert(asyncCountResult === 10)
-    // Try and launch task after decommissioning, this should fail
-    val postDecommissioned = input.map(x => x)
-    val postDecomAsyncCount = postDecommissioned.countAsync()
-    val thrown = intercept[java.util.concurrent.TimeoutException]{
-      val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds)
-    }
-    assert(postDecomAsyncCount.isCompleted === false,
-      "After exec decommission new task could not launch")
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 6a52f72..25145da 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.storage
 
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
Semaphore}
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
Semaphore, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -69,9 +69,9 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
       .set(config.STORAGE_DECOMMISSION_ENABLED, true)
       .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, persist)
       .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
-      // Just replicate blocks as fast as we can during testing, there isn't 
another
+      // Just replicate blocks quickly during testing, there isn't another
       // workload we need to worry about.
-      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
 
     if (whenToDecom == TaskStarted) {
       // We are using accumulators below, make sure those are reported 
frequently.
@@ -266,18 +266,17 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     val execIdToBlocksMapping = storageStatus.map(
       status => (status.blockManagerId.executorId, status.blocks)).toMap
     // No cached blocks should be present on executor which was decommissioned
-    
assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === 
Seq(),
+    assert(
+      !execIdToBlocksMapping.contains(execToDecommission) ||
+      execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === 
Seq(),
       "Cache blocks should be migrated")
     if (persist) {
       // There should still be all the RDD blocks cached
       assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 
numParts)
     }
 
-    // Make the executor we decommissioned exit
-    sched.client.killExecutors(List(execToDecommission))
-
-    // Wait for the executor to be removed
-    executorRemovedSem.acquire(1)
+    // Wait for the executor to be removed automatically after migration.
+    assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES))
 
     // Since the RDD is cached or shuffled so further usage of same RDD should 
use the
     // cached data. Original RDD partitions should not be recomputed i.e. accum
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index 41b68d5..74ad8bd 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.storage
 import scala.concurrent.duration._
 
 import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.matchers.must.Matchers
 
@@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite 
with Matchers {
   private val sparkConf = new SparkConf(false)
     .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
     .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true)
+    // Just replicate blocks quickly during testing, as there isn't another
+    // workload we need to worry about.
+    .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
 
   private def registerShuffleBlocks(
       mockMigratableShuffleResolver: MigratableResolver,
@@ -54,6 +57,113 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateDecommissionTimestamps(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decommissioning manager timestamps and status
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime: Option[Long] = None
+    try {
+      bmDecomManager.start()
+      eventually(timeout(100.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail) {
+          previousTime match {
+            case None =>
+              previousTime = Some(currentTime)
+              assert(false)
+            case Some(t) =>
+              assert(t < currentTime)
+          }
+        } else {
+          // If we expect migration to fail we should get the max value 
quickly.
+          assert(currentTime === Long.MaxValue)
+        }
+      }
+      if (!fail) {
+        // Wait 5 seconds and assert times keep moving forward.
+        Thread.sleep(5000)
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done && currentTime > previousTime.get)
+      }
+    } finally {
+      bmDecomManager.stop()
+    }
+  }
+
+  test("test that with no blocks we finish migration") {
+    // Set up the mocks so we return empty
+    val bm = mock(classOf[BlockManager])
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    when(migratableShuffleBlockResolver.getStoredShuffles())
+      .thenReturn(Seq())
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq())
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+    // Verify the decom manager handles this correctly
+    validateDecommissionTimestamps(sparkConf, bm, 
migratableShuffleBlockResolver)
+  }
+
+  test("block decom manager with no migrations configured") {
+    val bm = mock(classOf[BlockManager])
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq())
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+    val badConf = new SparkConf(false)
+      .set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, false)
+      .set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, false)
+      .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
+    // Verify the decom manager handles this correctly
+    validateDecommissionTimestamps(badConf, bm, migratableShuffleBlockResolver,
+      fail = true)
+  }
+
+  test("block decom manager with no peers") {
+    // Set up the mocks so we return one shuffle block
+    val bm = mock(classOf[BlockManager])
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq())
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq())
+
+    // Verify the decom manager handles this correctly
+    validateDecommissionTimestamps(sparkConf, bm, 
migratableShuffleBlockResolver,
+      fail = true)
+  }
+
+
+  test("block decom manager with only shuffle files time moves forward") {
+    // Set up the mocks so we return one shuffle block
+    val bm = mock(classOf[BlockManager])
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq())
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(BlockManagerId("exec2", "host2", 12345)))
+
+    // Verify the decom manager handles this correctly
+    validateDecommissionTimestamps(sparkConf, bm, 
migratableShuffleBlockResolver)
+  }
+
   test("test shuffle and cached rdd migration without any error") {
     val blockTransferService = mock(classOf[BlockTransferService])
     val bm = mock(classOf[BlockManager])
@@ -77,13 +187,36 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     try {
       bmDecomManager.start()
 
-      eventually(timeout(5.second), interval(10.milliseconds)) {
+      var previousRDDTime: Option[Long] = None
+      var previousShuffleTime: Option[Long] = None
+
+      // We don't check that all blocks are migrated because out mock is 
always returning an RDD.
+      eventually(timeout(100.second), interval(10.milliseconds)) {
         assert(bmDecomManager.shufflesToMigrate.isEmpty == true)
-        verify(bm, times(1)).replicateBlock(
+        verify(bm, least(1)).replicateBlock(
           mc.eq(storedBlockId1), mc.any(), mc.any(), mc.eq(Some(3)))
         verify(blockTransferService, times(2))
           .uploadBlockSync(mc.eq("host2"), mc.eq(bmPort), mc.eq("exec2"), 
mc.any(), mc.any(),
             mc.eq(StorageLevel.DISK_ONLY), mc.isNull())
+        // Since we never "finish" the RDD blocks, make sure the time is 
always moving forward.
+        assert(bmDecomManager.rddBlocksLeft)
+        previousRDDTime match {
+          case None =>
+            previousRDDTime = Some(bmDecomManager.lastRDDMigrationTime)
+            assert(false)
+          case Some(t) =>
+            assert(bmDecomManager.lastRDDMigrationTime > t)
+        }
+        // Since we do eventually finish the shuffle blocks make sure the 
shuffle blocks complete
+        // and that the time keeps moving forward.
+        assert(!bmDecomManager.shuffleBlocksLeft)
+        previousShuffleTime match {
+          case None =>
+            previousShuffleTime = Some(bmDecomManager.lastShuffleMigrationTime)
+            assert(false)
+          case Some(t) =>
+            assert(bmDecomManager.lastShuffleMigrationTime > t)
+        }
       }
     } finally {
         bmDecomManager.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to