[ 
https://issues.apache.org/jira/browse/SPARK-20219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960608#comment-15960608
 ] 

jin xing commented on SPARK-20219:
----------------------------------

[~kayousterhout] [~irashid]
Thanks a lot for taking look at this :)  And sorry for late reply. 
The use cases are like below:
1. It is a Spark SQL job in my cluster. The sql is quite long and I'm hesitant 
to post it here(I can post later if there is people want to see it :)). There 
is 3 stages in the job: Stage-1, Stage-2, Stage-3. Stage-3 shuffle read from 
Stage-1 and Stage-2. There are 2000 partitions in Stage-3(we set 
spark.sql.shuffle.partitions=2000). The distribution of the size of the 
shuffle-read is in the screenshot.
Running with the change in the pr, total time cost of Stage-3 is 3654 seconds. 
Without the change, it will cost 4934 seconds. I supplied 50 executors(this is 
common in data warehouse when the job failed to acquire enough containers from 
yarn) to Stage-3. I think the improvement here is a good one.

2. I also did a small test in my local environment. Code is like below:
{code}
    val rdd = sc.textFile("/tmp/data", 9)
    rdd.map {
      case num =>
        (num, 1)
    }.groupByKey.map {
      case (key, iter) =>
        iter.sum
        (key, iter.size)
    }.collect.foreach(println)
{code}
There are 200m lines in the RDD, the content is some people's names. In the 
ResultStage, the first 8 partitions are almost of the same size and the 9th 
partition is 10 times of the first 8 partitions.
Running with the change, the result is:
17/04/07 11:50:52 INFO DAGScheduler: ResultStage 1 (collect at 
SparkArchetype.scala:26) finished in 23.027 s.
Running without the change, the result is:
17/04/07 11:54:27 INFO DAGScheduler: ResultStage 1 (collect at 
SparkArchetype.scala:26) finished in 34.546 s.

In my warehouse, there are lots of cases like the first one I described above. 
So I really hope this idea could be taken into consideration. I feel sorry to 
bring in the complexity and I'm very thankful if you can give some advice for 
better implementation.

> Schedule tasks based on size of input from ScheduledRDD
> -------------------------------------------------------
>
>                 Key: SPARK-20219
>                 URL: https://issues.apache.org/jira/browse/SPARK-20219
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: jin xing
>         Attachments: screenshot-1.png
>
>
> When data is highly skewed on ShuffledRDD, it make sense to launch those 
> tasks which process much more input as soon as possible. The current 
> scheduling mechanism in *TaskSetManager* is quite simple:
> {code}
>   for (i <- (0 until numTasks).reverse) {
>     addPendingTask(i)
>   }
> {code}
> In scenario that "large tasks" locate at bottom half of tasks array, if tasks 
> with much more input are launched early, we can significantly reduce the time 
> cost and save resource when *"dynamic allocation"* is disabled.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to