agrawaldevesh commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r465999198



##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -266,18 +266,17 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     val execIdToBlocksMapping = storageStatus.map(
       status => (status.blockManagerId.executorId, status.blocks)).toMap
     // No cached blocks should be present on executor which was decommissioned
-    
assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === 
Seq(),
+    assert(
+      !execIdToBlocksMapping.contains(execToDecommission) ||
+      execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === 
Seq(),
       "Cache blocks should be migrated")
     if (persist) {
       // There should still be all the RDD blocks cached
       assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 
numParts)
     }
 
-    // Make the executor we decommissioned exit
-    sched.client.killExecutors(List(execToDecommission))
-
-    // Wait for the executor to be removed
-    executorRemovedSem.acquire(1)
+    // Wait for the executor to be removed automatically after migration.
+    assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES))

Review comment:
       Sounds good. Would it be okay to add a comment about how long does it 
usually take in the code itself and mention that the larger timeout is for good 
measure ? 

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -54,6 +57,106 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
     }
   }
 
+  /**
+   * Validate a given configuration with the mocks.
+   * The fail variable controls if we expect migration to fail, in which case 
we expect
+   * a constant Long.MaxValue timestamp.
+   */
+  private def validateWithMocks(conf: SparkConf, bm: BlockManager,
+      migratableShuffleBlockResolver: MigratableResolver, fail: Boolean = 
false) = {
+    // Verify the decom manager handles this correctly
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    var previousTime = Long.MaxValue
+    try {
+      bmDecomManager.start()
+      eventually(timeout(10.second), interval(10.milliseconds)) {
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done)
+        // Make sure the time stamp starts moving forward.
+        if (!fail && previousTime > currentTime) {
+          previousTime = currentTime
+          assert(false)
+        } else if (fail) {
+          assert(currentTime === Long.MaxValue)
+        }
+      }
+      if (!fail) {
+        // Wait 5 seconds and assert times keep moving forward.
+        Thread.sleep(5000)
+        val (currentTime, done) = bmDecomManager.lastMigrationInfo()
+        assert(done && currentTime > previousTime)
+      }
+    } finally {
+      bmDecomManager.stop()

Review comment:
       Sounds good !




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to