[ 
https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177502#comment-15177502
 ] 

Andy Sloane commented on SPARK-13631:
-------------------------------------

Further bisecting with the reduceLocality flag forced to true confirmed; 
SPARK-2774 / commit 96a7c888d806adfdb2c722025a1079ed7eaa2052 introduced the 
function that's failing here with a note that it's known not to be thread safe. 
But it seems we're calling it in a multithreaded context and when the exception 
occurs, mapStatuses contains an entry for the shuffle, but it's an array full 
of nulls (I am guessing it's because {{registerShuffle}} was called but none of 
the map outputs have been registered yet).

So, the following sort of lame patch seems to fix it, but I'm not totally sure 
about its correctness, or whether we're doing something as Spark clients that's 
causing us to get into a bad state (we have several threads running scheduling 
jobs concurrently):

{code}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 72355cd..c0f1a36 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -394,28 +394,32 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf)
       fractionThreshold: Double)
     : Option[Array[BlockManagerId]] = {
 
-    if (mapStatuses.contains(shuffleId)) {
-      val statuses = mapStatuses(shuffleId)
-      if (statuses.nonEmpty) {
-        // HashMap to add up sizes of all blocks at the same location
-        val locs = new HashMap[BlockManagerId, Long]
-        var totalOutputSize = 0L
-        var mapIdx = 0
-        while (mapIdx < statuses.length) {
-          val status = statuses(mapIdx)
-          val blockSize = status.getSizeForBlock(reducerId)
-          if (blockSize > 0) {
-            locs(status.location) = locs.getOrElse(status.location, 0L) + 
blockSize
-            totalOutputSize += blockSize
+    val statuses = mapStatuses.get(shuffleId).orNull
+    if (statuses != null) {
+      statuses.synchronized {
+        if (statuses.nonEmpty) {
+          // HashMap to add up sizes of all blocks at the same location
+          val locs = new HashMap[BlockManagerId, Long]
+          var totalOutputSize = 0L
+          var mapIdx = 0
+          while (mapIdx < statuses.length) {
+            val status = statuses(mapIdx)
+            if (status != null) {
+              val blockSize = status.getSizeForBlock(reducerId)
+              if (blockSize > 0) {
+                locs(status.location) = locs.getOrElse(status.location, 0L) + 
blockSize
+                totalOutputSize += blockSize
+              }
+            }
+            mapIdx = mapIdx + 1
+          }
+          val topLocs = locs.filter { case (loc, size) =>
+            size.toDouble / totalOutputSize >= fractionThreshold
+          }
+          // Return if we have any locations which satisfy the required 
threshold
+          if (topLocs.nonEmpty) {
+            return Some(topLocs.map(_._1).toArray)
           }
-          mapIdx = mapIdx + 1
-        }
-        val topLocs = locs.filter { case (loc, size) =>
-          size.toDouble / totalOutputSize >= fractionThreshold
-        }
-        // Return if we have any locations which satisfy the required threshold
-        if (topLocs.nonEmpty) {
-          return Some(topLocs.map(_._1).toArray)
         }
       }
     }
{code}


> getPreferredLocations race condition in spark 1.6.0?
> ----------------------------------------------------
>
>                 Key: SPARK-13631
>                 URL: https://issues.apache.org/jira/browse/SPARK-13631
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 1.6.0
>            Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. 
> When we run jobs with multiple threads, we have a crash somewhere inside 
> getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs 
> instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly 
> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our 
> threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
>       at 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>       at 
> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>       at 
> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>       at scala.Option.getOrElse(Option.scala:120)
>       at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
> The full trace is available here:
> https://gist.github.com/andy256/97611f19924bbf65cf49



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to