Repository: spark
Updated Branches:
  refs/heads/master 1d00761b9 -> a250933c6


[SPARK-19803][CORE][TEST] Proactive replication test failures

## What changes were proposed in this pull request?
Executors cache a list of their peers that is refreshed by default every 
minute. The cached stale references were randomly being used for replication. 
Since those executors were removed from the master, they did not occur in the 
block locations as reported by the master. This was fixed by
1. Refreshing peer cache in the block manager before trying to pro-actively 
replicate. This way the probability of replicating to a failed executor is 
eliminated.
2. Explicitly stopping the block manager in the tests. This shuts down the RPC 
endpoint use by the block manager. This way, even if a block manager tries to 
replicate using a stale reference, the replication logic should take care of 
refreshing the list of peers after failure.

## How was this patch tested?
Tested manually

Author: Shubham Chopra <schopr...@bloomberg.net>
Author: Kay Ousterhout <kayousterh...@gmail.com>
Author: Shubham Chopra <shubhamcho...@users.noreply.github.com>

Closes #17325 from shubhamchopra/SPARK-19803.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a250933c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a250933c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a250933c

Branch: refs/heads/master
Commit: a250933c625ed720d15a0e479e9c51113605b102
Parents: 1d00761
Author: Shubham Chopra <schopr...@bloomberg.net>
Authored: Tue Mar 28 09:47:29 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Mar 28 09:47:29 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/storage/BlockInfoManager.scala |  6 ++++
 .../org/apache/spark/storage/BlockManager.scala |  6 +++-
 .../storage/BlockManagerReplicationSuite.scala  | 29 ++++++++++++--------
 3 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a250933c/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 490d45d..3db5983 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -371,6 +371,12 @@ private[storage] class BlockInfoManager extends Logging {
     blocksWithReleasedLocks
   }
 
+  /** Returns the number of locks held by the given task.  Used only for 
testing. */
+  private[storage] def getTaskLockCount(taskAttemptId: TaskAttemptId): Int = {
+    readLocksByTask.get(taskAttemptId).map(_.size()).getOrElse(0) +
+      writeLocksByTask.get(taskAttemptId).map(_.size).getOrElse(0)
+  }
+
   /**
    * Returns the number of blocks tracked.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/a250933c/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 245d94a..991346a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1187,7 +1187,7 @@ private[spark] class BlockManager(
       blockId: BlockId,
       existingReplicas: Set[BlockManagerId],
       maxReplicas: Int): Unit = {
-    logInfo(s"Pro-actively replicating $blockId")
+    logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
     blockInfoManager.lockForReading(blockId).foreach { info =>
       val data = doGetLocalBytes(blockId, info)
       val storageLevel = StorageLevel(
@@ -1196,9 +1196,13 @@ private[spark] class BlockManager(
         useOffHeap = info.level.useOffHeap,
         deserialized = info.level.deserialized,
         replication = maxReplicas)
+      // we know we are called as a result of an executor removal, so we 
refresh peer cache
+      // this way, we won't try to replicate to a missing executor with a 
stale reference
+      getPeers(forceFetch = true)
       try {
         replicate(blockId, data, storageLevel, info.classTag, existingReplicas)
       } finally {
+        logDebug(s"Releasing lock for $blockId")
         releaseLock(blockId)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a250933c/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index d907add..d5715f8 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -493,27 +493,34 @@ class BlockManagerProactiveReplicationSuite extends 
BlockManagerReplicationBehav
     assert(blockLocations.size === replicationFactor)
 
     // remove a random blockManager
-    val executorsToRemove = blockLocations.take(replicationFactor - 1)
+    val executorsToRemove = blockLocations.take(replicationFactor - 1).toSet
     logInfo(s"Removing $executorsToRemove")
-    executorsToRemove.foreach{exec =>
-      master.removeExecutor(exec.executorId)
+    initialStores.filter(bm => 
executorsToRemove.contains(bm.blockManagerId)).foreach { bm =>
+      master.removeExecutor(bm.blockManagerId.executorId)
+      bm.stop()
       // giving enough time for replication to happen and new block be 
reported to master
-      Thread.sleep(200)
+      eventually(timeout(5 seconds), interval(100 millis)) {
+        val newLocations = master.getLocations(blockId).toSet
+        assert(newLocations.size === replicationFactor)
+      }
     }
 
-    val newLocations = eventually(timeout(5 seconds), interval(10 millis)) {
+    val newLocations = eventually(timeout(5 seconds), interval(100 millis)) {
       val _newLocations = master.getLocations(blockId).toSet
       assert(_newLocations.size === replicationFactor)
       _newLocations
     }
     logInfo(s"New locations : $newLocations")
-    // there should only be one common block manager between initial and new 
locations
-    assert(newLocations.intersect(blockLocations.toSet).size === 1)
 
-    // check if all the read locks have been released
-    initialStores.filter(bm => 
newLocations.contains(bm.blockManagerId)).foreach { bm =>
-      val locks = bm.releaseAllLocksForTask(BlockInfo.NON_TASK_WRITER)
-      assert(locks.size === 0, "Read locks unreleased!")
+    // new locations should not contain stopped block managers
+    assert(newLocations.forall(bmId => !executorsToRemove.contains(bmId)),
+      "New locations contain stopped block managers.")
+
+    // Make sure all locks have been released.
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      initialStores.filter(bm => 
newLocations.contains(bm.blockManagerId)).foreach { bm =>
+        assert(bm.blockInfoManager.getTaskLockCount(BlockInfo.NON_TASK_WRITER) 
=== 0)
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to