[jira] [Updated] (SPARK-14178) DAGScheduler should get map output statuses directly, not by MapOutputTrackerMaster.getSerializedMapOutputStatuses.

2016-03-30 Thread Sean Owen (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-14178:
--
Target Version/s:   (was: 1.6.1, 2.0.0)

> DAGScheduler should get map output statuses directly, not by 
> MapOutputTrackerMaster.getSerializedMapOutputStatuses.
> ---
>
> Key: SPARK-14178
> URL: https://issues.apache.org/jira/browse/SPARK-14178
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Guoqiang Li
>
> DAGScheduler gets map output statuses by 
> {{MapOutputTrackerMaster.getSerializedMapOutputStatuses}}.
> [DAGScheduler.scala#L357 | 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357]
> {noformat}
>   private def newOrUsedShuffleStage(
>   shuffleDep: ShuffleDependency[_, _, _],
>   firstJobId: Int): ShuffleMapStage = {
> val rdd = shuffleDep.rdd
> val numTasks = rdd.partitions.length
> val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, 
> rdd.creationSite)
> if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
>   val serLocs = 
> mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
>   // Deserialization very time consuming. 
>  val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
>   (0 until locs.length).foreach { i =>
> if (locs(i) ne null) {
>   // locs(i) will be null if missing
>   stage.addOutputLoc(i, locs(i))
> }
>   }
> } else {
>   // Kind of ugly: need to register RDDs with the cache and map output 
> tracker here
>   // since we can't do it in the RDD constructor because # of partitions 
> is unknown
>   logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
>   mapOutputTracker.registerShuffle(shuffleDep.shuffleId, 
> rdd.partitions.length)
> }
> stage
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-14178) DAGScheduler should get map output statuses directly, not by MapOutputTrackerMaster.getSerializedMapOutputStatuses.

2016-03-26 Thread Guoqiang Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-14178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guoqiang Li updated SPARK-14178:

Summary: DAGScheduler should get map output statuses directly, not by 
MapOutputTrackerMaster.getSerializedMapOutputStatuses.  (was: Compare 
Option[String] and String directly)

> DAGScheduler should get map output statuses directly, not by 
> MapOutputTrackerMaster.getSerializedMapOutputStatuses.
> ---
>
> Key: SPARK-14178
> URL: https://issues.apache.org/jira/browse/SPARK-14178
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: Guoqiang Li
>
> DAGScheduler gets map output statuses by 
> {{MapOutputTrackerMaster.getSerializedMapOutputStatuses}}.
> [DAGScheduler.scala#L357 | 
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357]
> {noformat}
>   private def newOrUsedShuffleStage(
>   shuffleDep: ShuffleDependency[_, _, _],
>   firstJobId: Int): ShuffleMapStage = {
> val rdd = shuffleDep.rdd
> val numTasks = rdd.partitions.length
> val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, 
> rdd.creationSite)
> if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
>   val serLocs = 
> mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
>   // Deserialization very time consuming. 
>  val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
>   (0 until locs.length).foreach { i =>
> if (locs(i) ne null) {
>   // locs(i) will be null if missing
>   stage.addOutputLoc(i, locs(i))
> }
>   }
> } else {
>   // Kind of ugly: need to register RDDs with the cache and map output 
> tracker here
>   // since we can't do it in the RDD constructor because # of partitions 
> is unknown
>   logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
>   mapOutputTracker.registerShuffle(shuffleDep.shuffleId, 
> rdd.partitions.length)
> }
> stage
>   }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org