Repository: spark
Updated Branches:
  refs/heads/master 2c5af7d4d -> cbff2803e


[SPARK-13631][CORE] Thread-safe getLocationsWithLargestOutputs

## What changes were proposed in this pull request?

If a job is being scheduled in one thread which has a dependency on an
RDD currently executing a shuffle in another thread, Spark would throw a
NullPointerException. This patch synchronizes access to `mapStatuses` and
skips null status entries (which are in-progress shuffle tasks).

## How was this patch tested?

Our client code unit test suite, which was reliably reproducing the race
condition with 10 threads, shows that this fixes it. I have not found a minimal
test case to add to Spark, but I will attempt to do so if desired.

The same test case was tripping up on SPARK-4454, which was fixed by
making other DAGScheduler code thread-safe.

shivaram srowen

Author: Andy Sloane <aslo...@tetrationanalytics.com>

Closes #11505 from a1k0n/SPARK-13631.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbff2803
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbff2803
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbff2803

Branch: refs/heads/master
Commit: cbff2803ef117d7cffe6f05fc1bbd395a1e9c587
Parents: 2c5af7d
Author: Andy Sloane <aslo...@tetrationanalytics.com>
Authored: Wed Mar 9 10:25:47 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Mar 9 10:25:47 2016 +0000

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     | 52 +++++++++++---------
 1 file changed, 29 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cbff2803/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index eb2fdec..9cb6159 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -376,8 +376,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
    * @param numReducers total number of reducers in the shuffle
    * @param fractionThreshold fraction of total map output size that a 
location must have
    *                          for it to be considered large.
-   *
-   * This method is not thread-safe.
    */
   def getLocationsWithLargestOutputs(
       shuffleId: Int,
@@ -386,28 +384,36 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf)
       fractionThreshold: Double)
     : Option[Array[BlockManagerId]] = {
 
-    if (mapStatuses.contains(shuffleId)) {
-      val statuses = mapStatuses(shuffleId)
-      if (statuses.nonEmpty) {
-        // HashMap to add up sizes of all blocks at the same location
-        val locs = new HashMap[BlockManagerId, Long]
-        var totalOutputSize = 0L
-        var mapIdx = 0
-        while (mapIdx < statuses.length) {
-          val status = statuses(mapIdx)
-          val blockSize = status.getSizeForBlock(reducerId)
-          if (blockSize > 0) {
-            locs(status.location) = locs.getOrElse(status.location, 0L) + 
blockSize
-            totalOutputSize += blockSize
+    val statuses = mapStatuses.get(shuffleId).orNull
+    if (statuses != null) {
+      statuses.synchronized {
+        if (statuses.nonEmpty) {
+          // HashMap to add up sizes of all blocks at the same location
+          val locs = new HashMap[BlockManagerId, Long]
+          var totalOutputSize = 0L
+          var mapIdx = 0
+          while (mapIdx < statuses.length) {
+            val status = statuses(mapIdx)
+            // status may be null here if we are called between 
registerShuffle, which creates an
+            // array with null entries for each output, and 
registerMapOutputs, which populates it
+            // with valid status entries. This is possible if one thread 
schedules a job which
+            // depends on an RDD which is currently being computed by another 
thread.
+            if (status != null) {
+              val blockSize = status.getSizeForBlock(reducerId)
+              if (blockSize > 0) {
+                locs(status.location) = locs.getOrElse(status.location, 0L) + 
blockSize
+                totalOutputSize += blockSize
+              }
+            }
+            mapIdx = mapIdx + 1
+          }
+          val topLocs = locs.filter { case (loc, size) =>
+            size.toDouble / totalOutputSize >= fractionThreshold
+          }
+          // Return if we have any locations which satisfy the required 
threshold
+          if (topLocs.nonEmpty) {
+            return Some(topLocs.keys.toArray)
           }
-          mapIdx = mapIdx + 1
-        }
-        val topLocs = locs.filter { case (loc, size) =>
-          size.toDouble / totalOutputSize >= fractionThreshold
-        }
-        // Return if we have any locations which satisfy the required threshold
-        if (topLocs.nonEmpty) {
-          return Some(topLocs.map(_._1).toArray)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to