[jira] [Updated] (SPARK-14178) DAGScheduler should get map output statuses directly, not by MapOutputTrackerMaster.getSerializedMapOutputStatuses.
[ https://issues.apache.org/jira/browse/SPARK-14178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen updated SPARK-14178: -- Target Version/s: (was: 1.6.1, 2.0.0) > DAGScheduler should get map output statuses directly, not by > MapOutputTrackerMaster.getSerializedMapOutputStatuses. > --- > > Key: SPARK-14178 > URL: https://issues.apache.org/jira/browse/SPARK-14178 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Guoqiang Li > > DAGScheduler gets map output statuses by > {{MapOutputTrackerMaster.getSerializedMapOutputStatuses}}. > [DAGScheduler.scala#L357 | > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357] > {noformat} > private def newOrUsedShuffleStage( > shuffleDep: ShuffleDependency[_, _, _], > firstJobId: Int): ShuffleMapStage = { > val rdd = shuffleDep.rdd > val numTasks = rdd.partitions.length > val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, > rdd.creationSite) > if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { > val serLocs = > mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) > // Deserialization very time consuming. > val locs = MapOutputTracker.deserializeMapStatuses(serLocs) > (0 until locs.length).foreach { i => > if (locs(i) ne null) { > // locs(i) will be null if missing > stage.addOutputLoc(i, locs(i)) > } > } > } else { > // Kind of ugly: need to register RDDs with the cache and map output > tracker here > // since we can't do it in the RDD constructor because # of partitions > is unknown > logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") > mapOutputTracker.registerShuffle(shuffleDep.shuffleId, > rdd.partitions.length) > } > stage > } > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-14178) DAGScheduler should get map output statuses directly, not by MapOutputTrackerMaster.getSerializedMapOutputStatuses.
[ https://issues.apache.org/jira/browse/SPARK-14178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guoqiang Li updated SPARK-14178: Summary: DAGScheduler should get map output statuses directly, not by MapOutputTrackerMaster.getSerializedMapOutputStatuses. (was: Compare Option[String] and String directly) > DAGScheduler should get map output statuses directly, not by > MapOutputTrackerMaster.getSerializedMapOutputStatuses. > --- > > Key: SPARK-14178 > URL: https://issues.apache.org/jira/browse/SPARK-14178 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: Guoqiang Li > > DAGScheduler gets map output statuses by > {{MapOutputTrackerMaster.getSerializedMapOutputStatuses}}. > [DAGScheduler.scala#L357 | > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357] > {noformat} > private def newOrUsedShuffleStage( > shuffleDep: ShuffleDependency[_, _, _], > firstJobId: Int): ShuffleMapStage = { > val rdd = shuffleDep.rdd > val numTasks = rdd.partitions.length > val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, > rdd.creationSite) > if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) { > val serLocs = > mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId) > // Deserialization very time consuming. > val locs = MapOutputTracker.deserializeMapStatuses(serLocs) > (0 until locs.length).foreach { i => > if (locs(i) ne null) { > // locs(i) will be null if missing > stage.addOutputLoc(i, locs(i)) > } > } > } else { > // Kind of ugly: need to register RDDs with the cache and map output > tracker here > // since we can't do it in the RDD constructor because # of partitions > is unknown > logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")") > mapOutputTracker.registerShuffle(shuffleDep.shuffleId, > rdd.partitions.length) > } > stage > } > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org