[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Ioannis Deligiannis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193547#comment-15193547
 ] 

Ioannis Deligiannis commented on SPARK-13718:
-

Our solution can serve up to 8 users requests per second with a constant 
latency of less than 2 seconds. If users requests increase, throughput 
increases to ~16 users per second with latency just above 6 seconds. This 
performance scales linearly when the described issue is not materialized. 

So this translates to ~240 users per minute on constant latency or 480 users 
per minute on higher latency (but less than 10 seconds). 

PS. As I mentioned above, these are not exactly point queries so each 
aggregation typically works on less than 2% of the data. Also note, that in 
practice the 'long operations' is not DNA analysis :) but part of Spark Kryo 
serialization and compression of the cached RDD partitions.


> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193515#comment-15193515
 ] 

Sean Owen commented on SPARK-13718:
---

Some SQL queries end up touching lots of data, or just a little; it sounds like 
all of these queries only operate on a record. I still wouldn't describe it as 
something you would use with high concurrency or low latency at scale. 
RDD.lookup() exists as a convenience but isn't something you'd call at scale. 

In fact I am not sure how this kind of point query system would fare under load 
-- 1 query at a time may be fine; 100 probably not, just given the sheer volume 
of tasks scheduling.

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Ioannis Deligiannis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193495#comment-15193495
 ] 

Ioannis Deligiannis commented on SPARK-13718:
-

I think I understand how you got to that conclusion and it would be hard to 
communicate the message without a whiteboard. Even though our use-case could 
partially be addressed with NoSQL, Spark is the right fit and works well as it 
addresses batch requests as well as interactive requests (with less than 2 
second latency).

Regarding the point lookup, it is partly true, as we are using a small subset 
to serve each ad-hoc request. But isn't this what many Spark SQL users do as 
well and the reason for having rdd.lookup()? This issue will cause the same 
effect if run using Spark SQL on cached RDDs; probably happening already but it 
is more complicated to debug Spark SQL.

Even though Spark works better for across the board groupings/aggregations, 
iterative algorithms & ML, performing well for distributed requests on skewed 
inputs can not be considered out-of-scope (especially since there is such 
investment is SparkSQL). 

In any case, thanks for taking the time to respond.

PS. I believe that this will soon be addressed since naturally users are trying 
to make use of a single platform for both batch and interactive access. 


> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough 

[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193389#comment-15193389
 ] 

Sean Owen commented on SPARK-13718:
---

Yeah, this is the problem. This is not at all a good application of Spark. You 
don't have a distributed computation problem; you're using it for point lookups 
and relatively quick processing for a synchronous response. This is much more 
what NoSQL store + app servers are for.

What you're suggesting doesn't even do much to help this situation.

I'm going to close this, not because it's not a good discussion, just because I 
think it's well away from the original point and I think is motivated by a 
usage pattern that's never going to scale well on Spark.

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Ioannis Deligiannis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193378#comment-15193378
 ] 

Ioannis Deligiannis commented on SPARK-13718:
-

(reply per paragraph)

To provide a bit more context, as I am not talking about point-lookups. Assume 
that you want your users to adhoc analyze DNA and say your RDD contains one 
record per mammal (e.g. Dinosaurs, butterfly, worm). If you have an interactive 
API (e.g. Spark SQL, REST Based or whatever), most users "queries" will focus 
on Dinosaurs. Say Dinosaur DNA record (cached) is say 10GBs. The "query" can be 
loosely written as mammalRDD.filter(id=="Dinosaurs").map(processDNA).collect();
So what will Spark scheduler do? Well, it will assign an executor to process 
each and every RDD, however the executor processing the Dinosaur record will 
take 10 minutes where the executors working on the other records will finish in 
milliseconds. Because 'Dinosaur' is popular subject 95% of your user queries 
will be scheduled to that one node that has the cached partition. So the next 
user will use an executor core for 10-min and so on until all that executor 
that has the cached data cannot accept any new tasks. The scheduler will then 
allocate a remote executor which is going to request 10GBs from the one that 
contains the cached partition. You can see at this point that once the 
scheduler starts assigning tasks to other executors your cluster goes in a 
"dark place" where it is performing horribly primarily because it does not use 
it's resources efficiently. 

Eventually, all of the RDD is required, but 1x replication is fine. What would 
work is -only- the hot partition to be replicated more than once. 

Agreed, but here I was making the case for both normal caching and let's call 
it "hot-replication". The caching algorithms are related and could be shared.

As I tried to illustrate with the example above, there are natural use-cases 
that make use of Spark both for querying but also for distributed processing. I 
believe that this will progress as it affects use-cases many of which are 
affecting financial institutions. If Spark is to be a leader in interactive big 
data processing (User driven requests) then this will have to be addressed. If 
you have a look into SO or even in user@, there is a growing number of people 
trying to this. Also in SO I have noticed a few cases where I am pretty sure 
that the underlying factor is this limitation.

You are right, I keep talking about imbalanced task processing times and 
linking it with data locality because (cache) locality is the main cause. If I 
do not cache my RDD, Spark scheduler will use the `preferred locations` to 
schedule a task, whereas when you cache it will only use the cached partition 
owner as the location to fetch. My intention is not to make something slow 
faster;I have a workaround for this. My ultimate goal here is the ability to 
utilize my cluster resources in a user-driven interactive environment. Since 
user requests are (almost) always skewed(or closer to a normal distribution) 
this issue is bound to happen. If Spark was only meant to be a batch 
environment, then this would probably have little effect.

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 

[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193326#comment-15193326
 ] 

Sean Owen commented on SPARK-13718:
---

This is probably better as a discussion on user@. A more general point here is 
that an RDD is not at all suitable for point-lookups like this at any 
significant scale, but maybe that's separate or just the example you chose here 
to illustrate.

Caching the whole RDD if you'll only ever access X is wasteful of course. But I 
presume X varies. If any record may be accessed, all partitions should be 
cached.

Different cache eviction policies are an interesting topic, but another topic 
yet again.

Yes, I understand you want partitions to be replicated extra times dynamically. 
I don't think you're addressing the complications I highlighted above, though 
that would come in some kind of design doc later. Still it matters when 
reasoning whether this is likely to be worth pursuing.

You keep talking about imbalanced task processing times but attaching it to 
data locality. Of course, non-local reads are slower, but I don't think that's 
the problem you're getting at. Right now you want a task to be able to schedule 
more freely exactly because its data locality doesn't matter much -- right? 
that's why you picked the fibonacci example? In which case, I don't think you 
need to do anything, because the task will already eventually schedule 
somewhere and non-locality won't matter.

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to 

[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Ioannis Deligiannis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193287#comment-15193287
 ] 

Ioannis Deligiannis commented on SPARK-13718:
-

(reply per paragraph)

That is the point I am trying to get through. An RDD operation will access all 
RDDs but  the processing weight is not the same for every data partition, which 
is why I gave the fibo example. Since, scheduling is bound to the RDD partition 
locality it under performs in many occasions. What you describe stands true 
when data and processing is even which is not the case in most 
real-time(=interactive) applications as requests as user driven and this looks 
more like a normal distribution (so you will get hot-partitions).

Let try and explain how this translates. My cached RDD is 80GB containing a 
billion records. One(out of thousands) specific partition contains a value say 
'X'. In pseudo-code the logic looks like: rdd.filter(f=='X').map("very 
expensive op").reduce(...). So user queries will quickly go over the cached 
partitions, not really do anything except in the case of 'X'. So, what we are 
suggesting here is that using an extra 80GB is logical, practical or cost 
effective? Is there a way to actually make the attached example really scale? 
(Note that the cluster is not utilized by external processes but by this code 
itself).

The way I can imagine this working add this on the RDD level. Similarly to 
`persist(MEMORY_ONLY)`, attached caching policies. These could be interfaced so 
even a user can attach an eviction policy to a RDD. Out of the box, the basics 
would be nice (LRU,RR,MRU,LFU) which could also apply to the way Sparks 
currently evicts partitions. In terms of replicas, an API like 
`rdd.setMaxDynamicReplication(int x)` would also help optimize scheduling. 
Finally, for cases where the cached RDD creations is cheap/simple, we could use 
something like "rdd.onNonLocality(Fetch/ReCreate) to either fetch from 
memory(way it works now when RDD is cached) or recreate (way it works now when 
RDD is not cached and replicas > 1). Note that the proposed names and available 
options are not suggestions but try to set the basis on how this could work. 





> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that 

[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193167#comment-15193167
 ] 

Sean Owen commented on SPARK-13718:
---

It's not so much that users need finer-grained per-partition caching control, 
since most RDD operations use all partitions every time (excepting special 
cases like "first"). For this reason, I think that this kind of idea is 
basically equivalent to dynamically increasing replication of the whole RDD.

The issue here is having cached data available only on busy nodes. You can 
control this a little bit by asking for 2x replication, so that's already 
possible. There's no 3x setting right now. The benefit starts to decrease 
though. This would only help when, generally, over 2/3 of nodes are fully 
utilized (so that you might frequently not find any slot available next to one 
of 2 replicas, but might find a slot next to a 3rd replica), but the cluster is 
not fully utilized (or else there are just no free slots anyway).

The problem is not so much this idea but the questions it further invites: when 
do you un-persist extra replicas? when do you stop replicating? do you wait for 
N seconds before making a replica? you just get back to a thick-er forest of 
config options I think, since so much "depends". I'm having trouble seeing a 
win for the general case here.

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has 

[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Ioannis Deligiannis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193122#comment-15193122
 ] 

Ioannis Deligiannis commented on SPARK-13718:
-

That was my point above above partition replication (maybe not explained that 
clear). An RDD is cached and managed per-partition by Spark, but a user can 
only set the replication factor on the whole RDD. So, if I wanted to replicate 
my 'hot-partition' 10 times, I would need to replicate every RDD partition 
which is a waste of resources.

Moving the data from a busy node once is fine (or there could be a flag to 
re-create RDD from another preferred location). What makes this a problem is 
that we continuously hit the same cached partition. Running the example, every 
node in the cluster will be requesting & waiting for this partition. Adding 
more replicas of this one partition would solve this and not waste resources.
The rest "downsides" you mention are the ones I referred to above with 
"...product of RDD configuration and not default behavior as they would cause 
other issues...". 

Fibonacci was an example of how you can skew the CPU/"hot-partition", not an 
way to reproduce the issue (the reproduce example is attached).


I'd be happy to provide any help I can, but my Scala skills + Spark internals 
are quite poor to provide a design doc or code this. 

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope 

[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193074#comment-15193074
 ] 

Sean Owen commented on SPARK-13718:
---

(PS RDDs are already cached per-partition and can be replicated more than once.)

Yes, but this has down-sides too. Copying the cached data means serializing all 
of the data over the network anyway, so nothing is gained there. It also 
putting it into memory on the other node. That might or might not be useful; 
depends on whether the partition is heavily accessed in the future. But it also 
doesn't help the original problem here, since you're still proposing copying 
data off the busy node. So I am not clear this particular line of reasoning is 
helping this case.

In the Fibonacci case, data locality isn't a problem anyway. It's just skewed 
processing over evenly-distributed data.

Something like "smart partition replication" could be useful. It raises a lot 
of other questions: what if all partitions are hot, how do you prioritize, 
where do you put the partitions, when you stop replicating, when is replicating 
worse than reading from local storage once, etc. That is, I think it's got to 
be pretty complex. 

If this is headed towards a detailed design doc, OK, otherwise I think this 
should be closed.

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this 

[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-14 Thread Ioannis Deligiannis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193046#comment-15193046
 ] 

Ioannis Deligiannis commented on SPARK-13718:
-

It would be hard to impossible to automate scheduling of such tasks, however 
exposing scheduling APIs would make such cases easily manageable. In this case 
the following behavior would make things a lot better (note that these should 
be the product of RDD configuration and not default behavior as they would 
cause other issues):

Potential solution:
When a data-local node is CPU-bound and a cached partition is requested, that 
partition would be cached on the receiving node, making it available for other 
nodes as well, effectively increasing the data-local resources on that RDD 
partition (i.e. Instead of replicating RDD x2 in Memory, allow this to be on a 
partition basis and on-(high)-demand).

What would the above solve?
When you cache an RDD in memory (1 copy), you effectively reduce your data 
locality from HDFS n-replicas to 1. Even if the cached RDD is simply an 
in-memory copy of the data with little or no transformations, from the time you 
cache it, only one node is "data-local". Currently there is no way to 
efficiently deal with this. You can optimize for performance, but you can not 
make better utilization of you cluster resources. So, by allowing a specific 
"hot-partition" to be dynamically replicated across, we can effectively use our 
resources more efficiently. (To make an analogy, work the same way youtube 
replicates videos on-high demand)

PS1. When referring to "hot-partition", I mean a partition that uses more 
resources. Think about an int RDD that you apply a filter (say x==1000) and the 
run a Fibonacci function. Even if your data are evenly partitioned, your 
processing might be skewed.

PS2. I have attached a design doc that explains the problem better and a Java 
class that reproduces it. Should be easier to explain the issue.

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have 

[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-13 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192309#comment-15192309
 ] 

Sean Owen commented on SPARK-13718:
---

What do you mean that it tries to assign without an available core (slot?). I 
understand that if the data-local nodes are all busy, and the task is 
I/O-intensive, the reading remotely is not only going to be slower but put more 
load on the busy nodes. But, they may not be I/O-bound, just have all slots 
occupied. Or the job may not be I/O-intensive at all in which case data 
locality doesn't help. In this case, not scheduling the task is suboptimal.

But, when is it better to not schedule the task at all? you're saying it 
creates a straggler, but all you're saying is things take a while when 
resources are constrained. What is the better scheduling decision, even given 
omniscience?

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
> Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-07 Thread Ioannis Deligiannis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15182902#comment-15182902
 ] 

Ioannis Deligiannis commented on SPARK-13718:
-

True, tuning {{spark.locality.wait}} yields better performance and is 
application specific. However in essence the issue is that the scheduler tries 
to assign the task because it does not have an available cores (I believe). 

When the task is assigned to another node and that node tries to access 
resources on the originally 'busy' node, in most cases, it will incur longer 
wait times. As the data is also available in another N(Replicas-1), it has 
better (or equal) chances to get better results. 

Just to be clear, the issue is not about whether non-locality is faster or not; 
it is that newly scheduled tasks hit the same node that could not accept the 
task to begin with and creating a straggler. 

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause :*
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes (In some cases it will assign to NODE which means load 
> from HDD and then follow the sequence and fallback to ANY)
> 3. One of the nodes 'X' that accepted the task will eventually try to access 
> the data from node 'H' HDD. This adds HDD and Network I/O to node and also 
> some extra CPU for I/O.
> 4. 'X' time to complete increases ~5x as it involves HDD/Network 
> 5. Eventually, every node has a task that is waiting to fetch that specific 
> partition from node 'H' so cluster is basically blocked on a single node
> * Proposed Fix *
> I have not worked with Scala enough to propose a code fix, but on a high 
> level, when a task hits the 'spark.locality.wait' timeout, it should provide 
> a 'hint' to the node accepting the task to use as a data source 'replica' 
> that is not on the node that failed to accept the task in the first place.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node

2016-03-07 Thread Ioannis Deligiannis (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15182901#comment-15182901
 ] 

Ioannis Deligiannis commented on SPARK-13718:
-

Point taken, though I'd rank it higher than minor since it severely effect 
non-batch applications (Which is application terms would be considered a bug). 
In any case, do you think this would be better placed on the mailing list?

> Scheduler "creating" straggler node 
> 
>
> Key: SPARK-13718
> URL: https://issues.apache.org/jira/browse/SPARK-13718
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler, Spark Core
>Affects Versions: 1.3.1
> Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>Reporter: Ioannis Deligiannis
>Priority: Minor
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause :*
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes (In some cases it will assign to NODE which means load 
> from HDD and then follow the sequence and fallback to ANY)
> 3. One of the nodes 'X' that accepted the task will eventually try to access 
> the data from node 'H' HDD. This adds HDD and Network I/O to node and also 
> some extra CPU for I/O.
> 4. 'X' time to complete increases ~5x as it involves HDD/Network 
> 5. Eventually, every node has a task that is waiting to fetch that specific 
> partition from node 'H' so cluster is basically blocked on a single node
> * Proposed Fix *
> I have not worked with Scala enough to propose a code fix, but on a high 
> level, when a task hits the 'spark.locality.wait' timeout, it should provide 
> a 'hint' to the node accepting the task to use as a data source 'replica' 
> that is not on the node that failed to accept the task in the first place.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org