[jira] [Created] (SPARK-31649) Spread partitions evenly to spark executors

2020-05-06 Thread serdar onur (Jira)
serdar onur created SPARK-31649:
---

 Summary: Spread partitions evenly to spark executors
 Key: SPARK-31649
 URL: https://issues.apache.org/jira/browse/SPARK-31649
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.4
Reporter: serdar onur


The year is 2020 and I am still trying to find a solution to this. I totally 
understand what [~thunderstumpges] was trying to achieve and I am trying to 
achieve the same. For a tool like spark, it is unacceptable not to be able to 
distribute the created partitions to the executors evenly. You know, we can 
create a custom partitioner to distribute the data to the partitions evenly by 
creating our own partition index. I was under the impression that a similar 
approach could be applied to spread these partitions to the executors 
evenly(using some sort of executor index for selection of executors during 
partition distribution). I have been googling this for a day now and I am very 
disappointed to say that up to now this seems to be not possible.

Note: I am disappointed that the issue below was put into resolved state 
without actually doing anything about it.

https://issues.apache.org/jira/browse/SPARK-19371



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

2020-05-06 Thread serdar onur (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100517#comment-17100517
 ] 

serdar onur commented on SPARK-19371:
-

The year is 2020 and I am still trying to find a solution to this. I totally 
understand what [~thunderstumpges] was trying to achieve and I am trying to 
achieve the same. For a tool like spark, it is unacceptable not to be able to 
distribute the created partitions to the executors evenly. You know, we can 
create a custom partitioner to distribute the data to the partitions evenly by 
creating our own partition index. I was under the impression that a similar 
approach could be applied to spread these partitions to the executors 
evenly(using some sort of executor index for selection of executors during 
partition distribution). I have been googling this for a day now and I am very 
disappointed to say that up to now this seems to be not possible.

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
>Priority: Major
>  Labels: bulk-closed
> Attachments: RDD Block Distribution on two executors.png, Unbalanced 
> RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and 
> resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org