[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16252953#comment-16252953
 ] 

Thunder Stumpges commented on SPARK-19371:
------------------------------------------

In my case, it is important because that cached RDD is partitioned carefully, 
and used to join to a streaming dataset every 10 seconds to perform processing. 
The stream RDD is shuffled to align/join with this data-set, and work is then 
done. This runs for days or months at a time. If I could just once at the 
beginning move the RDD blocks so they're balanced, the benefit of that one time 
move (all in memory) would pay off many times over. 

Setting locality.wait=0 only causes this same network traffic to be ongoing (a 
task starting on one executor and its RDD block being on another executor), 
correct? BTW, I have tried my job with spark.locality.wait=0, but it seems to 
have limited effect. Tasks are still being executed on the executors with the 
RDD blocks. Only two tasks did NOT run as PROCESS_LOCAL, and they took many 
times longer to execute:

!execution timeline.png!

It all seems very clear in my head, am I missing something? Is this a really 
complicated thing to do? 

Basically the operation would be a "rebalance" that applies to cached RDDs and 
would attempt to balance the distribution of blocks across executors. 

> Cannot spread cached partitions evenly across executors
> -------------------------------------------------------
>
>                 Key: SPARK-19371
>                 URL: https://issues.apache.org/jira/browse/SPARK-19371
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.6.1
>            Reporter: Thunder Stumpges
>         Attachments: Unbalanced RDD Blocks, and resulting task imbalance.png, 
> Unbalanced RDD Blocks, and resulting task imbalance.png, execution 
> timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
>     parquet("/data/folder_to_load").
>     repartition(numPartitions).
>     persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
>     parquet("/data/folder_to_load").
>     repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
>     persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to