[jira] [Commented] (SPARK-13631) getPreferredLocations race condition in spark 1.6.0?

2016-03-02 Thread Andy Sloane (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177261#comment-15177261
 ] 

Andy Sloane commented on SPARK-13631:
-

Did some digging with git bisect.

It turns out to be directly linked to {{spark.shuffle.reduceLocality.enabled}}. 
The difference between Spark 1.6 and 1.5 here is that 1.5 has it {{false}} by 
default, and 1.6 has it {{true}} by default.

Setting it to false cures this in 1.6, and setting it to true causes it to 
re-emerge in 1.5.


> getPreferredLocations race condition in spark 1.6.0?
> 
>
> Key: SPARK-13631
> URL: https://issues.apache.org/jira/browse/SPARK-13631
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. 
> When we run jobs with multiple threads, we have a crash somewhere inside 
> getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs 
> instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly 
> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our 
> threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>   at 
> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>   at 
> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
> The full trace is available here:
> https://gist.github.com/andy256/97611f19924bbf65cf49



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13631) getPreferredLocations race condition in spark 1.6.0?

2016-03-03 Thread Andy Sloane (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177502#comment-15177502
 ] 

Andy Sloane commented on SPARK-13631:
-

Further bisecting with the reduceLocality flag forced to true confirmed; 
SPARK-2774 / commit 96a7c888d806adfdb2c722025a1079ed7eaa2052 introduced the 
function that's failing here with a note that it's known not to be thread safe. 
But it seems we're calling it in a multithreaded context and when the exception 
occurs, mapStatuses contains an entry for the shuffle, but it's an array full 
of nulls (I am guessing it's because {{registerShuffle}} was called but none of 
the map outputs have been registered yet).

So, the following sort of lame patch seems to fix it, but I'm not totally sure 
about its correctness, or whether we're doing something as Spark clients that's 
causing us to get into a bad state (we have several threads running scheduling 
jobs concurrently):

{code}
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 72355cd..c0f1a36 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -394,28 +394,32 @@ private[spark] class MapOutputTrackerMaster(conf: 
SparkConf)
   fractionThreshold: Double)
 : Option[Array[BlockManagerId]] = {
 
-if (mapStatuses.contains(shuffleId)) {
-  val statuses = mapStatuses(shuffleId)
-  if (statuses.nonEmpty) {
-// HashMap to add up sizes of all blocks at the same location
-val locs = new HashMap[BlockManagerId, Long]
-var totalOutputSize = 0L
-var mapIdx = 0
-while (mapIdx < statuses.length) {
-  val status = statuses(mapIdx)
-  val blockSize = status.getSizeForBlock(reducerId)
-  if (blockSize > 0) {
-locs(status.location) = locs.getOrElse(status.location, 0L) + 
blockSize
-totalOutputSize += blockSize
+val statuses = mapStatuses.get(shuffleId).orNull
+if (statuses != null) {
+  statuses.synchronized {
+if (statuses.nonEmpty) {
+  // HashMap to add up sizes of all blocks at the same location
+  val locs = new HashMap[BlockManagerId, Long]
+  var totalOutputSize = 0L
+  var mapIdx = 0
+  while (mapIdx < statuses.length) {
+val status = statuses(mapIdx)
+if (status != null) {
+  val blockSize = status.getSizeForBlock(reducerId)
+  if (blockSize > 0) {
+locs(status.location) = locs.getOrElse(status.location, 0L) + 
blockSize
+totalOutputSize += blockSize
+  }
+}
+mapIdx = mapIdx + 1
+  }
+  val topLocs = locs.filter { case (loc, size) =>
+size.toDouble / totalOutputSize >= fractionThreshold
+  }
+  // Return if we have any locations which satisfy the required 
threshold
+  if (topLocs.nonEmpty) {
+return Some(topLocs.map(_._1).toArray)
   }
-  mapIdx = mapIdx + 1
-}
-val topLocs = locs.filter { case (loc, size) =>
-  size.toDouble / totalOutputSize >= fractionThreshold
-}
-// Return if we have any locations which satisfy the required threshold
-if (topLocs.nonEmpty) {
-  return Some(topLocs.map(_._1).toArray)
 }
   }
 }
{code}


> getPreferredLocations race condition in spark 1.6.0?
> 
>
> Key: SPARK-13631
> URL: https://issues.apache.org/jira/browse/SPARK-13631
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. 
> When we run jobs with multiple threads, we have a crash somewhere inside 
> getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs 
> instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly 
> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our 
> threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>   at 
> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>   at 
> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply

[jira] [Commented] (SPARK-13631) getPreferredLocations race condition in spark 1.6.0?

2016-03-03 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177508#comment-15177508
 ] 

Sean Owen commented on SPARK-13631:
---

(Propose it as a pull request)

> getPreferredLocations race condition in spark 1.6.0?
> 
>
> Key: SPARK-13631
> URL: https://issues.apache.org/jira/browse/SPARK-13631
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. 
> When we run jobs with multiple threads, we have a crash somewhere inside 
> getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs 
> instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly 
> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our 
> threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>   at 
> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>   at 
> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
> The full trace is available here:
> https://gist.github.com/andy256/97611f19924bbf65cf49



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13631) getPreferredLocations race condition in spark 1.6.0?

2016-03-03 Thread Andy Sloane (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177893#comment-15177893
 ] 

Andy Sloane commented on SPARK-13631:
-

I will.. It was late and I was brain-dumping..

> getPreferredLocations race condition in spark 1.6.0?
> 
>
> Key: SPARK-13631
> URL: https://issues.apache.org/jira/browse/SPARK-13631
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. 
> When we run jobs with multiple threads, we have a crash somewhere inside 
> getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs 
> instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly 
> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our 
> threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>   at 
> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>   at 
> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
> The full trace is available here:
> https://gist.github.com/andy256/97611f19924bbf65cf49



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13631) getPreferredLocations race condition in spark 1.6.0?

2016-03-03 Thread Andy Sloane (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15178710#comment-15178710
 ] 

Andy Sloane commented on SPARK-13631:
-

Did a little more testing, added some logs, and confirmed my latest hypothesis:

{code}
16/03/03 11:26:23 INFO MapOutputTrackerMaster: Registering shuffle 18 with 1 
empty maps (registerShuffle)
16/03/03 11:26:23 INFO MapOutputTrackerMaster: getting locations for shuffle 18 
reducer 0/1 (getLocationsWithLargestOutputs)
16/03/03 11:26:23 INFO MapOutputTrackerMaster: Registering shuffle 18 with 1 
map outputs (registerMapOutputs)
16/03/03 11:26:23 INFO MapOutputTrackerMaster: getting locations for shuffle 18 
reducer 0/1 (getLocationsWithLargestOutputs)
{code}

As I suspected, the call to {{getLocationsWithLargestOutputs}} in one thread is 
getting interleaved between {{registerShuffle}} and {{registerMapOutputs}} in 
another thread. I think there are two jobs which both depend on the output of a 
shuffle stage, and each are waiting for the stage to be marked finished.

So there might be a better fix in addition to the above, which is to prevent 
the first task from attempting to schedule before the map outputs are 
registered. I noticed that DAGScheduler calls {{markStageAsFinished}} before 
calling {{registerMapOutputs}}, but switching the order doesn't seem to help 
things. Still trying to understand the sequence of events here.

> getPreferredLocations race condition in spark 1.6.0?
> 
>
> Key: SPARK-13631
> URL: https://issues.apache.org/jira/browse/SPARK-13631
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. 
> When we run jobs with multiple threads, we have a crash somewhere inside 
> getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs 
> instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly 
> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our 
> threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>   at 
> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>   at 
> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
> The full trace is available here:
> https://gist.github.com/andy256/97611f19924bbf65cf49



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13631) getPreferredLocations race condition in spark 1.6.0?

2016-03-03 Thread Andy Sloane (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15178980#comment-15178980
 ] 

Andy Sloane commented on SPARK-13631:
-

Hm. I see, it's just a coincidence that a thread tends to get scheduled from 
the thread pool while the shuffle task is running, and tries to schedule a new 
job based on the locations of the (currently in-process) shuffle tasks. There's 
no blocking wait on an RDD happening that's causing this job to kick off -- 
it's just trying to schedule it.

It seems like we'd want to defer finding preferred locations on an RDD which is 
currently being computed, somehow, but the job planning seems to happen 
completely up front and there aren't any indicators in an individual RDD that 
it's presently being computed. Which means we are probably unnecessarily 
recomputing RDDs in this multithreaded scheme.



> getPreferredLocations race condition in spark 1.6.0?
> 
>
> Key: SPARK-13631
> URL: https://issues.apache.org/jira/browse/SPARK-13631
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. 
> When we run jobs with multiple threads, we have a crash somewhere inside 
> getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs 
> instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly 
> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our 
> threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>   at 
> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>   at 
> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
> The full trace is available here:
> https://gist.github.com/andy256/97611f19924bbf65cf49



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13631) getPreferredLocations race condition in spark 1.6.0?

2016-03-03 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15179019#comment-15179019
 ] 

Apache Spark commented on SPARK-13631:
--

User 'a1k0n' has created a pull request for this issue:
https://github.com/apache/spark/pull/11505

> getPreferredLocations race condition in spark 1.6.0?
> 
>
> Key: SPARK-13631
> URL: https://issues.apache.org/jira/browse/SPARK-13631
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler
>Affects Versions: 1.6.0
>Reporter: Andy Sloane
>
> We are seeing something that looks a lot like a regression from spark 1.2. 
> When we run jobs with multiple threads, we have a crash somewhere inside 
> getPreferredLocations, as was fixed in SPARK-4454. Except now it's inside 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs 
> instead of DAGScheduler directly.
> I tried Spark 1.2 post-SPARK-4454 (before this patch it's only slightly 
> flaky), 1.4.1, and 1.5.2 and all are fine. 1.6.0 immediately crashes on our 
> threaded test case, though once in a while it passes.
> The stack trace is huge, but starts like this:
> Caused by: java.lang.NullPointerException: null
>   at 
> org.apache.spark.MapOutputTrackerMaster.getLocationsWithLargestOutputs(MapOutputTracker.scala:406)
>   at 
> org.apache.spark.MapOutputTrackerMaster.getPreferredLocationsForShuffle(MapOutputTracker.scala:366)
>   at 
> org.apache.spark.rdd.ShuffledRDD.getPreferredLocations(ShuffledRDD.scala:92)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:257)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:256)
>   at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1545)
> The full trace is available here:
> https://gist.github.com/andy256/97611f19924bbf65cf49



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org