[
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100517#comment-17100517
]
serdar onur commented on SPARK-19371:
-
The year is 2020 and I am still trying to find a solution to this. I totally
understand what [~thunderstumpges] was trying to achieve and I am trying to
achieve the same. For a tool like spark, it is unacceptable not to be able to
distribute the created partitions to the executors evenly. You know, we can
create a custom partitioner to distribute the data to the partitions evenly by
creating our own partition index. I was under the impression that a similar
approach could be applied to spread these partitions to the executors
evenly(using some sort of executor index for selection of executors during
partition distribution). I have been googling this for a day now and I am very
disappointed to say that up to now this seems to be not possible.
> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
> Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
>Priority: Major
> Labels: bulk-closed
> Attachments: RDD Block Distribution on two executors.png, Unbalanced
> RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and
> resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic
> model training), we need to load a dataset and persist it across executors.
> After loading from HDFS and persisting, the partitions are spread unevenly
> across executors (based on the initial scheduling of the reads which are not
> data locale sensitive). The partition sizes are even, just not their
> distribution over executors. We currently have no way to force the partitions
> to spread evenly, and as the iterative algorithm begins, tasks are
> distributed to executors based on this initial load, forcing some very
> unbalanced work.
> This has been mentioned a
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
> of
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
> in
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
> user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here
> are examples of things I have tried. All resulted in partitions in memory
> that were NOT evenly distributed to executors, causing future tasks to be
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request
> that those partitions be stored evenly across executors in preparation for
> future tasks.
> I'm not sure if this is a more general issue (I.E. not just involving
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE
> difference in the over-all running time of the remaining work.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org