[ https://issues.apache.org/jira/browse/SPARK-20219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15960608#comment-15960608 ]
jin xing commented on SPARK-20219: ---------------------------------- [~kayousterhout] [~irashid] Thanks a lot for taking look at this :) And sorry for late reply. The use cases are like below: 1. It is a Spark SQL job in my cluster. The sql is quite long and I'm hesitant to post it here(I can post later if there is people want to see it :)). There is 3 stages in the job: Stage-1, Stage-2, Stage-3. Stage-3 shuffle read from Stage-1 and Stage-2. There are 2000 partitions in Stage-3(we set spark.sql.shuffle.partitions=2000). The distribution of the size of the shuffle-read is in the screenshot. Running with the change in the pr, total time cost of Stage-3 is 3654 seconds. Without the change, it will cost 4934 seconds. I supplied 50 executors(this is common in data warehouse when the job failed to acquire enough containers from yarn) to Stage-3. I think the improvement here is a good one. 2. I also did a small test in my local environment. Code is like below: {code} val rdd = sc.textFile("/tmp/data", 9) rdd.map { case num => (num, 1) }.groupByKey.map { case (key, iter) => iter.sum (key, iter.size) }.collect.foreach(println) {code} There are 200m lines in the RDD, the content is some people's names. In the ResultStage, the first 8 partitions are almost of the same size and the 9th partition is 10 times of the first 8 partitions. Running with the change, the result is: 17/04/07 11:50:52 INFO DAGScheduler: ResultStage 1 (collect at SparkArchetype.scala:26) finished in 23.027 s. Running without the change, the result is: 17/04/07 11:54:27 INFO DAGScheduler: ResultStage 1 (collect at SparkArchetype.scala:26) finished in 34.546 s. In my warehouse, there are lots of cases like the first one I described above. So I really hope this idea could be taken into consideration. I feel sorry to bring in the complexity and I'm very thankful if you can give some advice for better implementation. > Schedule tasks based on size of input from ScheduledRDD > ------------------------------------------------------- > > Key: SPARK-20219 > URL: https://issues.apache.org/jira/browse/SPARK-20219 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 2.1.0 > Reporter: jin xing > Attachments: screenshot-1.png > > > When data is highly skewed on ShuffledRDD, it make sense to launch those > tasks which process much more input as soon as possible. The current > scheduling mechanism in *TaskSetManager* is quite simple: > {code} > for (i <- (0 until numTasks).reverse) { > addPendingTask(i) > } > {code} > In scenario that "large tasks" locate at bottom half of tasks array, if tasks > with much more input are launched early, we can significantly reduce the time > cost and save resource when *"dynamic allocation"* is disabled. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org