Repository: spark Updated Branches: refs/heads/branch-1.6 8ec4f159a -> 95105b0e6
[SPARK-13631][CORE] Thread-safe getLocationsWithLargestOutputs ## What changes were proposed in this pull request? If a job is being scheduled in one thread which has a dependency on an RDD currently executing a shuffle in another thread, Spark would throw a NullPointerException. This patch synchronizes access to `mapStatuses` and skips null status entries (which are in-progress shuffle tasks). ## How was this patch tested? Our client code unit test suite, which was reliably reproducing the race condition with 10 threads, shows that this fixes it. I have not found a minimal test case to add to Spark, but I will attempt to do so if desired. The same test case was tripping up on SPARK-4454, which was fixed by making other DAGScheduler code thread-safe. shivaram srowen Author: Andy Sloane <aslo...@tetrationanalytics.com> Closes #11505 from a1k0n/SPARK-13631. (cherry picked from commit cbff2803ef117d7cffe6f05fc1bbd395a1e9c587) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95105b0e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95105b0e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95105b0e Branch: refs/heads/branch-1.6 Commit: 95105b0e6e38f5f13f41b06695c0b059ff911a44 Parents: 8ec4f15 Author: Andy Sloane <aslo...@tetrationanalytics.com> Authored: Wed Mar 9 10:25:47 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Mar 9 10:26:00 2016 +0000 ---------------------------------------------------------------------- .../org/apache/spark/MapOutputTracker.scala | 52 +++++++++++--------- 1 file changed, 29 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/95105b0e/core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 72355cd..998b4d5 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -384,8 +384,6 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) * @param numReducers total number of reducers in the shuffle * @param fractionThreshold fraction of total map output size that a location must have * for it to be considered large. - * - * This method is not thread-safe. */ def getLocationsWithLargestOutputs( shuffleId: Int, @@ -394,28 +392,36 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) fractionThreshold: Double) : Option[Array[BlockManagerId]] = { - if (mapStatuses.contains(shuffleId)) { - val statuses = mapStatuses(shuffleId) - if (statuses.nonEmpty) { - // HashMap to add up sizes of all blocks at the same location - val locs = new HashMap[BlockManagerId, Long] - var totalOutputSize = 0L - var mapIdx = 0 - while (mapIdx < statuses.length) { - val status = statuses(mapIdx) - val blockSize = status.getSizeForBlock(reducerId) - if (blockSize > 0) { - locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize - totalOutputSize += blockSize + val statuses = mapStatuses.get(shuffleId).orNull + if (statuses != null) { + statuses.synchronized { + if (statuses.nonEmpty) { + // HashMap to add up sizes of all blocks at the same location + val locs = new HashMap[BlockManagerId, Long] + var totalOutputSize = 0L + var mapIdx = 0 + while (mapIdx < statuses.length) { + val status = statuses(mapIdx) + // status may be null here if we are called between registerShuffle, which creates an + // array with null entries for each output, and registerMapOutputs, which populates it + // with valid status entries. This is possible if one thread schedules a job which + // depends on an RDD which is currently being computed by another thread. + if (status != null) { + val blockSize = status.getSizeForBlock(reducerId) + if (blockSize > 0) { + locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize + totalOutputSize += blockSize + } + } + mapIdx = mapIdx + 1 + } + val topLocs = locs.filter { case (loc, size) => + size.toDouble / totalOutputSize >= fractionThreshold + } + // Return if we have any locations which satisfy the required threshold + if (topLocs.nonEmpty) { + return Some(topLocs.keys.toArray) } - mapIdx = mapIdx + 1 - } - val topLocs = locs.filter { case (loc, size) => - size.toDouble / totalOutputSize >= fractionThreshold - } - // Return if we have any locations which satisfy the required threshold - if (topLocs.nonEmpty) { - return Some(topLocs.map(_._1).toArray) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org