Guoqiang Li created SPARK-14178:
-----------------------------------

             Summary: Compare Option[String] and String directly
                 Key: SPARK-14178
                 URL: https://issues.apache.org/jira/browse/SPARK-14178
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
            Reporter: Guoqiang Li


DAGScheduler gets map output statuses by 
{{MapOutputTrackerMaster.getSerializedMapOutputStatuses}}.

[DAGScheduler.scala#L357 | 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L357]
{noformat}
  private def newOrUsedShuffleStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    val rdd = shuffleDep.rdd
    val numTasks = rdd.partitions.length
    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, 
rdd.creationSite)
    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      val serLocs = 
mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      // Deserialization very time consuming.     
     val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else {
      // Kind of ugly: need to register RDDs with the cache and map output 
tracker here
      // since we can't do it in the RDD constructor because # of partitions is 
unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, 
rdd.partitions.length)
    }
    stage
  }
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to