spark git commit: [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master ae6cfb3ac - d46d6246d


[SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

This patch addresses a race condition in DAGScheduler by properly synchronizing 
accesses to its `cacheLocs` map.

This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, 
which can be called by separate threads, since DAGScheduler's 
`getPreferredLocs()` method is called by SparkContext and indirectly calls 
`getCacheLocs()`.  If this map is cleared by the DAGScheduler event processing 
thread while a user thread is submitting a job and computing preferred 
locations, then this can cause the user thread to throw 
NoSuchElementException: key not found errors.

Most accesses to DAGScheduler's internal state do not need synchronization 
because that state is only accessed from the event processing loop's thread.  
An alternative approach to fixing this bug would be to refactor this code so 
that SparkContext sends the DAGScheduler a message in order to get the list of 
preferred locations.  However, this would involve more extensive changes to 
this code and would be significantly harder to backport to maintenance branches 
since some of the related code has undergone significant refactoring (e.g. the 
introduction of EventLoop).  Since `cacheLocs` is the only state that's 
accessed in this way, adding simple synchronization seems like a better 
short-term fix.

See #3345 for additional context.

Author: Josh Rosen joshro...@databricks.com

Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits:

12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs 
map.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d46d6246
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d46d6246
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d46d6246

Branch: refs/heads/master
Commit: d46d6246d225ff3af09ebae1a09d4de2430c502d
Parents: ae6cfb3
Author: Josh Rosen joshro...@databricks.com
Authored: Tue Feb 17 17:39:58 2015 -0800
Committer: Patrick Wendell patr...@databricks.com
Committed: Tue Feb 17 17:39:58 2015 -0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 34 ++--
 1 file changed, 24 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d46d6246/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7903557..9c355d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -98,7 +98,13 @@ class DAGScheduler(
 
   private[scheduler] val activeJobs = new HashSet[ActiveJob]
 
-  // Contains the locations that each RDD's partitions are cached on
+  /**
+   * Contains the locations that each RDD's partitions are cached on.  This 
map's keys are RDD ids
+   * and its values are arrays indexed by partition numbers. Each array value 
is the set of
+   * locations where that RDD partition is cached.
+   *
+   * All accesses to this map should be guarded by synchronizing on it (see 
SPARK-4454).
+   */
   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
 
   // For tracking failed nodes, we use the MapOutputTracker's epoch number, 
which is sent with
@@ -183,18 +189,17 @@ class DAGScheduler(
 eventProcessLoop.post(TaskSetFailed(taskSet, reason))
   }
 
-  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
-if (!cacheLocs.contains(rdd.id)) {
+  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = 
cacheLocs.synchronized {
+cacheLocs.getOrElseUpdate(rdd.id, {
   val blockIds = rdd.partitions.indices.map(index = RDDBlockId(rdd.id, 
index)).toArray[BlockId]
   val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, 
blockManagerMaster)
-  cacheLocs(rdd.id) = blockIds.map { id =
+  blockIds.map { id =
 locs.getOrElse(id, Nil).map(bm = TaskLocation(bm.host, bm.executorId))
   }
-}
-cacheLocs(rdd.id)
+})
   }
 
-  private def clearCacheLocs() {
+  private def clearCacheLocs(): Unit = cacheLocs.synchronized {
 cacheLocs.clear()
   }
 
@@ -1276,17 +1281,26 @@ class DAGScheduler(
   }
 
   /**
-   * Synchronized method that might be called from other threads.
+   * Gets the locality information associated with a partition of a particular 
RDD.
+   *
+   * This method is thread-safe and is called from both DAGScheduler and 
SparkContext.
+   *
* @param rdd whose partitions are to be looked at
* @param partition to lookup 

spark git commit: [SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

2015-02-17 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 cb905841b - 07a401a7b


[SPARK-4454] Properly synchronize accesses to DAGScheduler cacheLocs map

This patch addresses a race condition in DAGScheduler by properly synchronizing 
accesses to its `cacheLocs` map.

This map is accessed by the `getCacheLocs` and `clearCacheLocs()` methods, 
which can be called by separate threads, since DAGScheduler's 
`getPreferredLocs()` method is called by SparkContext and indirectly calls 
`getCacheLocs()`.  If this map is cleared by the DAGScheduler event processing 
thread while a user thread is submitting a job and computing preferred 
locations, then this can cause the user thread to throw 
NoSuchElementException: key not found errors.

Most accesses to DAGScheduler's internal state do not need synchronization 
because that state is only accessed from the event processing loop's thread.  
An alternative approach to fixing this bug would be to refactor this code so 
that SparkContext sends the DAGScheduler a message in order to get the list of 
preferred locations.  However, this would involve more extensive changes to 
this code and would be significantly harder to backport to maintenance branches 
since some of the related code has undergone significant refactoring (e.g. the 
introduction of EventLoop).  Since `cacheLocs` is the only state that's 
accessed in this way, adding simple synchronization seems like a better 
short-term fix.

See #3345 for additional context.

Author: Josh Rosen joshro...@databricks.com

Closes #4660 from JoshRosen/SPARK-4454 and squashes the following commits:

12d64ba [Josh Rosen] Properly synchronize accesses to DAGScheduler cacheLocs 
map.

(cherry picked from commit d46d6246d225ff3af09ebae1a09d4de2430c502d)
Signed-off-by: Patrick Wendell patr...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07a401a7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07a401a7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07a401a7

Branch: refs/heads/branch-1.3
Commit: 07a401a7beea864092ec8f8c451e05cba5a19bbb
Parents: cb90584
Author: Josh Rosen joshro...@databricks.com
Authored: Tue Feb 17 17:39:58 2015 -0800
Committer: Patrick Wendell patr...@databricks.com
Committed: Tue Feb 17 17:40:04 2015 -0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 34 ++--
 1 file changed, 24 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07a401a7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7903557..9c355d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -98,7 +98,13 @@ class DAGScheduler(
 
   private[scheduler] val activeJobs = new HashSet[ActiveJob]
 
-  // Contains the locations that each RDD's partitions are cached on
+  /**
+   * Contains the locations that each RDD's partitions are cached on.  This 
map's keys are RDD ids
+   * and its values are arrays indexed by partition numbers. Each array value 
is the set of
+   * locations where that RDD partition is cached.
+   *
+   * All accesses to this map should be guarded by synchronizing on it (see 
SPARK-4454).
+   */
   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
 
   // For tracking failed nodes, we use the MapOutputTracker's epoch number, 
which is sent with
@@ -183,18 +189,17 @@ class DAGScheduler(
 eventProcessLoop.post(TaskSetFailed(taskSet, reason))
   }
 
-  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
-if (!cacheLocs.contains(rdd.id)) {
+  private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = 
cacheLocs.synchronized {
+cacheLocs.getOrElseUpdate(rdd.id, {
   val blockIds = rdd.partitions.indices.map(index = RDDBlockId(rdd.id, 
index)).toArray[BlockId]
   val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, 
blockManagerMaster)
-  cacheLocs(rdd.id) = blockIds.map { id =
+  blockIds.map { id =
 locs.getOrElse(id, Nil).map(bm = TaskLocation(bm.host, bm.executorId))
   }
-}
-cacheLocs(rdd.id)
+})
   }
 
-  private def clearCacheLocs() {
+  private def clearCacheLocs(): Unit = cacheLocs.synchronized {
 cacheLocs.clear()
   }
 
@@ -1276,17 +1281,26 @@ class DAGScheduler(
   }
 
   /**
-   * Synchronized method that might be called from other threads.
+   * Gets the locality information associated with a partition of a particular 
RDD.
+   *
+   * This method is thread-safe and is called