tomzhu created SPARK-22593:
------------------------------

             Summary: submitMissingTask in DagScheduler will call partitions 
function many times whch may be time consuming
                 Key: SPARK-22593
                 URL: https://issues.apache.org/jira/browse/SPARK-22593
             Project: Spark
          Issue Type: Question
          Components: Spark Core
    Affects Versions: 2.2.0
            Reporter: tomzhu
            Priority: Minor


when dagScheduler call submitMissing task, will create tasks and calling 
stage.rdd.partitions, it will can many times which may be time-consuming, the 
code is:
{quote}
    val tasks: Seq[Task[_]] = try {
      val serializedTaskMetrics = 
closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
      stage match {
        case stage: ShuffleMapStage =>
          stage.pendingPartitions.clear()
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            stage.pendingPartitions += id
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
              Option(sc.applicationId), sc.applicationAttemptId)
          }

        case stage: ResultStage =>
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)  //here is a little time  
consuming.
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, serializedTaskMetrics,
              Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
          }
      }
    } 
{quote}
for example, for a parallelCollectionRdd with 3 slices or partitions, to create 
task, the code will call stage.rdd.partitions three times, since 
stage.rdd.partitions will call getPartitions, so getPartions will call three 
times, it is a little time-cousuming. the stage.rdd.partitions code :

{quote}  
final def partitions: Array[Partition] = {
    checkpointRDD.map(_.partitions).getOrElse {
      if (partitions_ == null) {
        partitions_ = getPartitions
        partitions_.zipWithIndex.foreach { case (partition, index) =>
          require(partition.index == index,
            s"partitions($index).partition == ${partition.index}, but it should 
equal $index")
        }
      }
      partitions_
    }
  }
{quote}

it would be better to avoid this.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to