[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

2020-05-06 Thread Thunder Stumpges (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100878#comment-17100878
 ] 

Thunder Stumpges commented on SPARK-19371:
--

Thank you for the comments [~honor], [~danmeany], and [~lebedev] ! I am glad to 
see there are others with this issue. We have had to "just live with it" for 
these years. And this job is STILL running in production, every 10 seconds, 
wasting computing resources and time due to imbalanced cached partitions across 
the executors. 

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
>Priority: Major
>  Labels: bulk-closed
> Attachments: RDD Block Distribution on two executors.png, Unbalanced 
> RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and 
> resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252953#comment-16252953
 ] 

Thunder Stumpges edited comment on SPARK-19371 at 11/15/17 5:50 AM:


In my case, it is important because that cached RDD is partitioned carefully, 
and used to join to a streaming dataset every 10 seconds to perform processing. 
The stream RDD is shuffled to align/join with this data-set, and work is then 
done. This runs for days or months at a time. If I could just once at the 
beginning move the RDD blocks so they're balanced, the benefit of that one time 
move (all in memory) would pay off many times over. 

Setting locality.wait=0 only causes this same network traffic to be ongoing (a 
task starting on one executor and its RDD block being on another executor), 
correct? BTW, I have tried my job with spark.locality.wait=0, but it seems to 
have limited effect. Tasks are still being executed on the executors with the 
RDD blocks. Only two tasks did NOT run as PROCESS_LOCAL, and they took many 
times longer to execute:

!execution timeline.png|width=800, height=344!

It all seems very clear in my head, am I missing something? Is this a really 
complicated thing to do? 

Basically the operation would be a "rebalance" that applies to cached RDDs and 
would attempt to balance the distribution of blocks across executors. 


was (Author: thunderstumpges):
In my case, it is important because that cached RDD is partitioned carefully, 
and used to join to a streaming dataset every 10 seconds to perform processing. 
The stream RDD is shuffled to align/join with this data-set, and work is then 
done. This runs for days or months at a time. If I could just once at the 
beginning move the RDD blocks so they're balanced, the benefit of that one time 
move (all in memory) would pay off many times over. 

Setting locality.wait=0 only causes this same network traffic to be ongoing (a 
task starting on one executor and its RDD block being on another executor), 
correct? BTW, I have tried my job with spark.locality.wait=0, but it seems to 
have limited effect. Tasks are still being executed on the executors with the 
RDD blocks. Only two tasks did NOT run as PROCESS_LOCAL, and they took many 
times longer to execute:

!execution timeline.png!

It all seems very clear in my head, am I missing something? Is this a really 
complicated thing to do? 

Basically the operation would be a "rebalance" that applies to cached RDDs and 
would attempt to balance the distribution of blocks across executors. 

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: RDD Block Distribution on two executors.png, Unbalanced 
> RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and 
> resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition 

[jira] [Comment Edited] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252969#comment-16252969
 ] 

Thunder Stumpges edited comment on SPARK-19371 at 11/15/17 5:52 AM:


Here's a view of something that happened this time around as I started the job 
(this time with locality.wait using default). This RDD somehow got all 120 
partitions on ONLY TWO executors, both on the same physical machine. From now 
on, all jobs either run on that one single machine, or (if I set 
locality.wait=0) move that data to other executors before executing, correct?
!RDD Block Distribution on two executors.png|width=800, height=306!



was (Author: thunderstumpges):
Here's a view of something that happened this time around as I started the job 
(this time with locality.wait using default). This RDD somehow got all 120 
partitions on ONLY TWO executors, both on the same physical machine. From now 
on, all jobs either run on that one single machine, or (if I set 
locality.wait=0) move that data to other executors before executing, correct?
!RDD Block Distribution on two executors.png!


> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: RDD Block Distribution on two executors.png, Unbalanced 
> RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and 
> resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252969#comment-16252969
 ] 

Thunder Stumpges commented on SPARK-19371:
--

Here's a view of something that happened this time around as I started the job 
(this time with locality.wait using default). This RDD somehow got all 120 
partitions on ONLY TWO executors, both on the same physical machine. From now 
on, all jobs either run on that one single machine, or (if I set 
locality.wait=0) move that data to other executors before executing, correct?
!RDD Block Distribution on two executors.png!


> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: RDD Block Distribution on two executors.png, Unbalanced 
> RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and 
> resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thunder Stumpges updated SPARK-19371:
-
Attachment: RDD Block Distribution on two executors.png

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: RDD Block Distribution on two executors.png, Unbalanced 
> RDD Blocks, and resulting task imbalance.png, Unbalanced RDD Blocks, and 
> resulting task imbalance.png, execution timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16252953#comment-16252953
 ] 

Thunder Stumpges commented on SPARK-19371:
--

In my case, it is important because that cached RDD is partitioned carefully, 
and used to join to a streaming dataset every 10 seconds to perform processing. 
The stream RDD is shuffled to align/join with this data-set, and work is then 
done. This runs for days or months at a time. If I could just once at the 
beginning move the RDD blocks so they're balanced, the benefit of that one time 
move (all in memory) would pay off many times over. 

Setting locality.wait=0 only causes this same network traffic to be ongoing (a 
task starting on one executor and its RDD block being on another executor), 
correct? BTW, I have tried my job with spark.locality.wait=0, but it seems to 
have limited effect. Tasks are still being executed on the executors with the 
RDD blocks. Only two tasks did NOT run as PROCESS_LOCAL, and they took many 
times longer to execute:

!execution timeline.png!

It all seems very clear in my head, am I missing something? Is this a really 
complicated thing to do? 

Basically the operation would be a "rebalance" that applies to cached RDDs and 
would attempt to balance the distribution of blocks across executors. 

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: Unbalanced RDD Blocks, and resulting task imbalance.png, 
> Unbalanced RDD Blocks, and resulting task imbalance.png, execution 
> timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thunder Stumpges updated SPARK-19371:
-
Attachment: execution timeline.png

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: Unbalanced RDD Blocks, and resulting task imbalance.png, 
> Unbalanced RDD Blocks, and resulting task imbalance.png, execution 
> timeline.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251971#comment-16251971
 ] 

Thunder Stumpges edited comment on SPARK-19371 at 11/14/17 7:04 PM:


Hi [~sowen],

>From my perspective, you can attach whatever label you want (bug, feature, 
>story, PITA) but it is obviously causing troubles for more than just an 
>isolated case. 

And to re-state the issue, this is NOT (in my case at least) related to 
partitioner / hashing. My partitions are sized evenly, the keys are unique 
words (String) in a vocabulary. The issue is related to where the RDD blocks 
are distributed to the executors in cache storage. See the image below which 
shows the problem. 

What would solve this for me is not really a "shuffle" but a "rebalance 
storage" that moves RDD blocks in memory to balance them across executors.

!Unbalanced RDD Blocks, and resulting task imbalance.png!


was (Author: thunderstumpges):
Hi Sean,

>From my perspective, you can attach whatever label you want (bug, feature, 
>story, PITA) but it is obviously causing troubles for more than just an 
>isolated case. 

And to re-state the issue, this is NOT (in my case at least) related to 
partitioner / hashing. My partitions are sized evenly, the keys are unique 
words (String) in a vocabulary. The issue is related to where the RDD blocks 
are distributed to the executors in cache storage. See the image below which 
shows the problem. 

What would solve this for me is not really a "shuffle" but a "rebalance 
storage" that moves RDD blocks in memory to balance them across executors.

!Unbalanced RDD Blocks, and resulting task imbalance.png!

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: Unbalanced RDD Blocks, and resulting task imbalance.png, 
> Unbalanced RDD Blocks, and resulting task imbalance.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16251971#comment-16251971
 ] 

Thunder Stumpges commented on SPARK-19371:
--

Hi Sean,

>From my perspective, you can attach whatever label you want (bug, feature, 
>story, PITA) but it is obviously causing troubles for more than just an 
>isolated case. 

And to re-state the issue, this is NOT (in my case at least) related to 
partitioner / hashing. My partitions are sized evenly, the keys are unique 
words (String) in a vocabulary. The issue is related to where the RDD blocks 
are distributed to the executors in cache storage. See the image below which 
shows the problem. 

What would solve this for me is not really a "shuffle" but a "rebalance 
storage" that moves RDD blocks in memory to balance them across executors.

!Unbalanced RDD Blocks, and resulting task imbalance.png!

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: Unbalanced RDD Blocks, and resulting task imbalance.png, 
> Unbalanced RDD Blocks, and resulting task imbalance.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thunder Stumpges updated SPARK-19371:
-
Attachment: Unbalanced RDD Blocks, and resulting task imbalance.png

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: Unbalanced RDD Blocks, and resulting task imbalance.png, 
> Unbalanced RDD Blocks, and resulting task imbalance.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-11-14 Thread Thunder Stumpges (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thunder Stumpges updated SPARK-19371:
-
Attachment: Unbalanced RDD Blocks, and resulting task imbalance.png

> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
> Attachments: Unbalanced RDD Blocks, and resulting task imbalance.png
>
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-01-30 Thread Thunder Stumpges (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Thunder Stumpges commented on  SPARK-19371 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Cannot spread cached partitions evenly across executors  
 
 
 
 
 
 
 
 
 
 
Correct, I have tried both of these approaches to shuffling the data, each of which resulted in partitions not evenly balanced across executors: 
DataFrame.repartition does not have an option for Shuffle, so just the number of partitions from 2x down to the desired count: 

 

val numPartitions = 48*16
val df2 = sqlContext.read.
parquet("/data/folder_to_load").
repartition(numPartitions*2)
val df = df2.repartition(numPartitions).
persist
df.count
 

 
RDD coalesce with shuffle, recreation of DataFrame: 

 

val numPartitions = 48*16
val df2 = sqlContext.read.
parquet("/data/folder_to_load").
repartition(numPartitions*2)
val df = sqlContext.
   createDataFrame(df2.rdd.coalesce(numPartitions,true),df2.schema).
   persist
 

 
Both of these result in the same imbalance. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-01-30 Thread Thunder Stumpges (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Thunder Stumpges commented on  SPARK-19371 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Cannot spread cached partitions evenly across executors  
 
 
 
 
 
 
 
 
 
 

Yes, but don't you show that some executors have more blocks? they would attract more tasks seeking locality.
 
Yes, that is basically the core of this entire issue. I have no way to spread this evenly. Maybe I'm not following what you are suggesting I do. So far I have found no way to distribute the blocks evenly, and therefore no way to balance the tasks. (note that I DID try a repartition with shuffle as mentioned in the original issue, with no success in balancing partitions) 

try spark.locality.wait=0 and see if it's different
 
Wouldn't this incur significantly more network IO ? That would ignore completely the fact that the partitions exist in memory on the executor right? I can try this, but not sure if that trade off is going to work. 
Could you summarize your suggestions regarding resolution of this issue?  
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-01-30 Thread Thunder Stumpges (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Thunder Stumpges edited a comment on  SPARK-19371 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Cannot spread cached partitions evenly across executors  
 
 
 
 
 
 
 
 
 
 {quote}... one execution slot is as good as another, because it's a core operating on local memory. I do believe Spark already prefers spreading tasks across executors, all else equal.{quote}So, this does not seem to be what we're seeing. Here's output from our executors tab after running a while. Notice the executors with more RDD partitions have proportionally more tasks executed.||Executor ID||RDD Blocks||Storage Memory||Disk Used||Active Tasks||Failed Tasks||Complete Tasks||Total Tasks ||  | 1|8|91.2 MB / 2.7 GB|0.0 B|0|0|440804|440804 |  | 10|30|342.0 MB / 2.7 GB|0.0 B|0|0|722964|722964 |  | 11|8|91.2 MB / 2.7 GB|0.0 B|0|0|54|54 |  | 12|27|307.8 MB / 2.7 GB|0.0 B|0|0|780879|780879 | With RDD partitions cached, wouldn't the scheduler prefer to run the task on the executor containing the blocks? 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-01-30 Thread Thunder Stumpges (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Thunder Stumpges edited a comment on  SPARK-19371 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Cannot spread cached partitions evenly across executors  
 
 
 
 
 
 
 
 
 
 {quote}... one execution slot is as good as another, because it's a core operating on local memory. I do believe Spark already prefers spreading tasks across executors, all else equal.{quote}So, this does not seem to be what we're seeing. Here's output from our executors tab after running a while. Notice the executors with more RDD partitions have proportionally more tasks executed. || Executor ID ||  RDD Blocks ||  Storage Memory ||  Disk Used ||  Active Tasks ||  Failed Tasks ||  Complete Tasks ||  Total Tasks1 |  8 |  91.2 MB / 2.7 GB |  0.0 B |  0 |  0 |  440804 |  440804   10 |  30 |  342.0 MB / 2.7 GB |  0.0 B |  0 |  0 |  722964 |  722964   11 |  8 |  91.2 MB / 2.7 GB |  0.0 B |  0 |  0 |  54 |  54   12 |  27 |  307.8 MB / 2.7 GB |  0.0 B |  0 |  0 |  780879 |  780879 With RDD partitions cached, wouldn't the scheduler prefer to run the task on the executor containing the blocks? 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-01-30 Thread Thunder Stumpges (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Thunder Stumpges edited a comment on  SPARK-19371 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Cannot spread cached partitions evenly across executors  
 
 
 
 
 
 
 
 
 
 {quote}... one execution slot is as good as another, because it's a core operating on local memory. I do believe Spark already prefers spreading tasks across executors, all else equal.{quote}So, this does not seem to be what we're seeing. Here's output from our executors tab after running a while. Notice the executors with more RDD partitions have proportionally more tasks executed. Executor ID RDD Blocks Storage Memory Disk Used Active Tasks Failed Tasks Complete Tasks Total Tasks1 8 91.2 MB / 2.7 GB 0.0 B 0 0 440804 440804 10 30 342.0 MB / 2.7 GB 0.0 B 0 0 722964 722964 11 8 91.2 MB / 2.7 GB 0.0 B 0 0 54 54 12 27 307.8 MB / 2.7 GB 0.0 B 0 0 780879 780879 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-01-30 Thread Thunder Stumpges (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Thunder Stumpges commented on  SPARK-19371 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Cannot spread cached partitions evenly across executors  
 
 
 
 
 
 
 
 
 
 
 
... one execution slot is as good as another, because it's a core operating on local memory. I do believe Spark already prefers spreading tasks across executors, all else equal.
 
So, this does not seem to be what we're seeing. Here's output from our executors tab after running a while. Notice the executors with more RDD partitions have proportionally more tasks executed. 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



[jira] [Commented] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-01-26 Thread Thunder Stumpges (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15840569#comment-15840569
 ] 

Thunder Stumpges commented on SPARK-19371:
--

Thanks Sean. I don't think I can (always) rely on perfect distribution of the 
underlying data in HDFS and data locality for this as the number of files (I'm 
using parquet, but this could vary a lot) and data source in general could be 
different in different cases. An extreme example might be a query to 
ElasticSearch or Cassandra or other external store that has no locality with 
the spark executors.

I guess I can agree that you might not want to classify this as a bug, but it 
is at minimum a very important improvement / new feature request, as it can 
make certain workloads prohibitively slow, and create quite unbalanced 
utilization of a cluster on the whole.

I think shuffling is probably the right idea. It seems to me once an RDD is 
cached, a simple "rebalance" could be used to move the partitions around the 
executors in preparation for additional operations.


> Cannot spread cached partitions evenly across executors
> ---
>
> Key: SPARK-19371
> URL: https://issues.apache.org/jira/browse/SPARK-19371
> Project: Spark
>  Issue Type: Bug
>Affects Versions: 1.6.1
>Reporter: Thunder Stumpges
>
> Before running an intensive iterative job (in this case a distributed topic 
> model training), we need to load a dataset and persist it across executors. 
> After loading from HDFS and persisting, the partitions are spread unevenly 
> across executors (based on the initial scheduling of the reads which are not 
> data locale sensitive). The partition sizes are even, just not their 
> distribution over executors. We currently have no way to force the partitions 
> to spread evenly, and as the iterative algorithm begins, tasks are 
> distributed to executors based on this initial load, forcing some very 
> unbalanced work.
> This has been mentioned a 
> [number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
>  of 
> [times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
>  in 
> [various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
>  user/dev group threads.
> None of the discussions I could find had solutions that worked for me. Here 
> are examples of things I have tried. All resulted in partitions in memory 
> that were NOT evenly distributed to executors, causing future tasks to be 
> imbalanced across executors as well.
> *Reduce Locality*
> {code}spark.shuffle.reduceLocality.enabled=false/true{code}
> *"Legacy" memory mode*
> {code}spark.memory.useLegacyMode = true/false{code}
> *Basic load and repartition*
> {code}
> val numPartitions = 48*16
> val df = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions).
> persist
> df.count
> {code}
> *Load and repartition to 2x partitions, then shuffle repartition down to 
> desired partitions*
> {code}
> val numPartitions = 48*16
> val df2 = sqlContext.read.
> parquet("/data/folder_to_load").
> repartition(numPartitions*2)
> val df = df2.repartition(numPartitions).
> persist
> df.count
> {code}
> It would be great if when persisting an RDD/DataFrame, if we could request 
> that those partitions be stored evenly across executors in preparation for 
> future tasks. 
> I'm not sure if this is a more general issue (I.E. not just involving 
> persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
> difference in the over-all running time of the remaining work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19371) Cannot spread cached partitions evenly across executors

2017-01-26 Thread Thunder Stumpges (JIRA)
Thunder Stumpges created SPARK-19371:


 Summary: Cannot spread cached partitions evenly across executors
 Key: SPARK-19371
 URL: https://issues.apache.org/jira/browse/SPARK-19371
 Project: Spark
  Issue Type: Bug
Affects Versions: 1.6.1
Reporter: Thunder Stumpges


Before running an intensive iterative job (in this case a distributed topic 
model training), we need to load a dataset and persist it across executors. 

After loading from HDFS and persisting, the partitions are spread unevenly 
across executors (based on the initial scheduling of the reads which are not 
data locale sensitive). The partition sizes are even, just not their 
distribution over executors. We currently have no way to force the partitions 
to spread evenly, and as the iterative algorithm begins, tasks are distributed 
to executors based on this initial load, forcing some very unbalanced work.

This has been mentioned a 
[number|http://apache-spark-developers-list.1001551.n3.nabble.com/RDD-Partitions-not-distributed-evenly-to-executors-tt16988.html#a17059]
 of 
[times|http://apache-spark-user-list.1001560.n3.nabble.com/Spark-work-distribution-among-execs-tt26502.html]
 in 
[various|http://apache-spark-user-list.1001560.n3.nabble.com/Partitions-are-get-placed-on-the-single-node-tt26597.html]
 user/dev group threads.

None of the discussions I could find had solutions that worked for me. Here are 
examples of things I have tried. All resulted in partitions in memory that were 
NOT evenly distributed to executors, causing future tasks to be imbalanced 
across executors as well.

*Reduce Locality*
{code}spark.shuffle.reduceLocality.enabled=false/true{code}

*"Legacy" memory mode*
{code}spark.memory.useLegacyMode = true/false{code}

*Basic load and repartition*
{code}
val numPartitions = 48*16
val df = sqlContext.read.
parquet("/data/folder_to_load").
repartition(numPartitions).
persist
df.count
{code}

*Load and repartition to 2x partitions, then shuffle repartition down to 
desired partitions*
{code}
val numPartitions = 48*16
val df2 = sqlContext.read.
parquet("/data/folder_to_load").
repartition(numPartitions*2)
val df = df2.repartition(numPartitions).
persist
df.count
{code}

It would be great if when persisting an RDD/DataFrame, if we could request that 
those partitions be stored evenly across executors in preparation for future 
tasks. 

I'm not sure if this is a more general issue (I.E. not just involving 
persisting RDDs), but for the persisted in-memory case, it can make a HUGE 
difference in the over-all running time of the remaining work.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org