[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193547#comment-15193547 ] Ioannis Deligiannis commented on SPARK-13718: - Our solution can serve up to 8 users requests per second with a constant latency of less than 2 seconds. If users requests increase, throughput increases to ~16 users per second with latency just above 6 seconds. This performance scales linearly when the described issue is not materialized. So this translates to ~240 users per minute on constant latency or 480 users per minute on higher latency (but less than 10 seconds). PS. As I mentioned above, these are not exactly point queries so each aggregation typically works on less than 2% of the data. Also note, that in practice the 'long operations' is not DNA analysis :) but part of Spark Kryo serialization and compression of the cached RDD partitions. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has enough information to make sense. > *PS2* : Debugging this issue made me realize that there can be a lot of > use-cases that trigger this behaviour -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193515#comment-15193515 ] Sean Owen commented on SPARK-13718: --- Some SQL queries end up touching lots of data, or just a little; it sounds like all of these queries only operate on a record. I still wouldn't describe it as something you would use with high concurrency or low latency at scale. RDD.lookup() exists as a convenience but isn't something you'd call at scale. In fact I am not sure how this kind of point query system would fare under load -- 1 query at a time may be fine; 100 probably not, just given the sheer volume of tasks scheduling. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has enough information to make sense. > *PS2* : Debugging this issue made me realize that there can be a lot of > use-cases that trigger this behaviour -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193495#comment-15193495 ] Ioannis Deligiannis commented on SPARK-13718: - I think I understand how you got to that conclusion and it would be hard to communicate the message without a whiteboard. Even though our use-case could partially be addressed with NoSQL, Spark is the right fit and works well as it addresses batch requests as well as interactive requests (with less than 2 second latency). Regarding the point lookup, it is partly true, as we are using a small subset to serve each ad-hoc request. But isn't this what many Spark SQL users do as well and the reason for having rdd.lookup()? This issue will cause the same effect if run using Spark SQL on cached RDDs; probably happening already but it is more complicated to debug Spark SQL. Even though Spark works better for across the board groupings/aggregations, iterative algorithms & ML, performing well for distributed requests on skewed inputs can not be considered out-of-scope (especially since there is such investment is SparkSQL). In any case, thanks for taking the time to respond. PS. I believe that this will soon be addressed since naturally users are trying to make use of a single platform for both batch and interactive access. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has enough
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193389#comment-15193389 ] Sean Owen commented on SPARK-13718: --- Yeah, this is the problem. This is not at all a good application of Spark. You don't have a distributed computation problem; you're using it for point lookups and relatively quick processing for a synchronous response. This is much more what NoSQL store + app servers are for. What you're suggesting doesn't even do much to help this situation. I'm going to close this, not because it's not a good discussion, just because I think it's well away from the original point and I think is motivated by a usage pattern that's never going to scale well on Spark. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has enough information to make sense. > *PS2* : Debugging this issue made me realize that there can be a lot of > use-cases that trigger this behaviour -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193378#comment-15193378 ] Ioannis Deligiannis commented on SPARK-13718: - (reply per paragraph) To provide a bit more context, as I am not talking about point-lookups. Assume that you want your users to adhoc analyze DNA and say your RDD contains one record per mammal (e.g. Dinosaurs, butterfly, worm). If you have an interactive API (e.g. Spark SQL, REST Based or whatever), most users "queries" will focus on Dinosaurs. Say Dinosaur DNA record (cached) is say 10GBs. The "query" can be loosely written as mammalRDD.filter(id=="Dinosaurs").map(processDNA).collect(); So what will Spark scheduler do? Well, it will assign an executor to process each and every RDD, however the executor processing the Dinosaur record will take 10 minutes where the executors working on the other records will finish in milliseconds. Because 'Dinosaur' is popular subject 95% of your user queries will be scheduled to that one node that has the cached partition. So the next user will use an executor core for 10-min and so on until all that executor that has the cached data cannot accept any new tasks. The scheduler will then allocate a remote executor which is going to request 10GBs from the one that contains the cached partition. You can see at this point that once the scheduler starts assigning tasks to other executors your cluster goes in a "dark place" where it is performing horribly primarily because it does not use it's resources efficiently. Eventually, all of the RDD is required, but 1x replication is fine. What would work is -only- the hot partition to be replicated more than once. Agreed, but here I was making the case for both normal caching and let's call it "hot-replication". The caching algorithms are related and could be shared. As I tried to illustrate with the example above, there are natural use-cases that make use of Spark both for querying but also for distributed processing. I believe that this will progress as it affects use-cases many of which are affecting financial institutions. If Spark is to be a leader in interactive big data processing (User driven requests) then this will have to be addressed. If you have a look into SO or even in user@, there is a growing number of people trying to this. Also in SO I have noticed a few cases where I am pretty sure that the underlying factor is this limitation. You are right, I keep talking about imbalanced task processing times and linking it with data locality because (cache) locality is the main cause. If I do not cache my RDD, Spark scheduler will use the `preferred locations` to schedule a task, whereas when you cache it will only use the cached partition owner as the location to fetch. My intention is not to make something slow faster;I have a workaround for this. My ultimate goal here is the ability to utilize my cluster resources in a user-driven interactive environment. Since user requests are (almost) always skewed(or closer to a normal distribution) this issue is bound to happen. If Spark was only meant to be a batch environment, then this would probably have little effect. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193326#comment-15193326 ] Sean Owen commented on SPARK-13718: --- This is probably better as a discussion on user@. A more general point here is that an RDD is not at all suitable for point-lookups like this at any significant scale, but maybe that's separate or just the example you chose here to illustrate. Caching the whole RDD if you'll only ever access X is wasteful of course. But I presume X varies. If any record may be accessed, all partitions should be cached. Different cache eviction policies are an interesting topic, but another topic yet again. Yes, I understand you want partitions to be replicated extra times dynamically. I don't think you're addressing the complications I highlighted above, though that would come in some kind of design doc later. Still it matters when reasoning whether this is likely to be worth pursuing. You keep talking about imbalanced task processing times but attaching it to data locality. Of course, non-local reads are slower, but I don't think that's the problem you're getting at. Right now you want a task to be able to schedule more freely exactly because its data locality doesn't matter much -- right? that's why you picked the fibonacci example? In which case, I don't think you need to do anything, because the task will already eventually schedule somewhere and non-locality won't matter. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193287#comment-15193287 ] Ioannis Deligiannis commented on SPARK-13718: - (reply per paragraph) That is the point I am trying to get through. An RDD operation will access all RDDs but the processing weight is not the same for every data partition, which is why I gave the fibo example. Since, scheduling is bound to the RDD partition locality it under performs in many occasions. What you describe stands true when data and processing is even which is not the case in most real-time(=interactive) applications as requests as user driven and this looks more like a normal distribution (so you will get hot-partitions). Let try and explain how this translates. My cached RDD is 80GB containing a billion records. One(out of thousands) specific partition contains a value say 'X'. In pseudo-code the logic looks like: rdd.filter(f=='X').map("very expensive op").reduce(...). So user queries will quickly go over the cached partitions, not really do anything except in the case of 'X'. So, what we are suggesting here is that using an extra 80GB is logical, practical or cost effective? Is there a way to actually make the attached example really scale? (Note that the cluster is not utilized by external processes but by this code itself). The way I can imagine this working add this on the RDD level. Similarly to `persist(MEMORY_ONLY)`, attached caching policies. These could be interfaced so even a user can attach an eviction policy to a RDD. Out of the box, the basics would be nice (LRU,RR,MRU,LFU) which could also apply to the way Sparks currently evicts partitions. In terms of replicas, an API like `rdd.setMaxDynamicReplication(int x)` would also help optimize scheduling. Finally, for cases where the cached RDD creations is cheap/simple, we could use something like "rdd.onNonLocality(Fetch/ReCreate) to either fetch from memory(way it works now when RDD is cached) or recreate (way it works now when RDD is not cached and replicas > 1). Note that the proposed names and available options are not suggestions but try to set the basis on how this could work. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193167#comment-15193167 ] Sean Owen commented on SPARK-13718: --- It's not so much that users need finer-grained per-partition caching control, since most RDD operations use all partitions every time (excepting special cases like "first"). For this reason, I think that this kind of idea is basically equivalent to dynamically increasing replication of the whole RDD. The issue here is having cached data available only on busy nodes. You can control this a little bit by asking for 2x replication, so that's already possible. There's no 3x setting right now. The benefit starts to decrease though. This would only help when, generally, over 2/3 of nodes are fully utilized (so that you might frequently not find any slot available next to one of 2 replicas, but might find a slot next to a 3rd replica), but the cluster is not fully utilized (or else there are just no free slots anyway). The problem is not so much this idea but the questions it further invites: when do you un-persist extra replicas? when do you stop replicating? do you wait for N seconds before making a replica? you just get back to a thick-er forest of config options I think, since so much "depends". I'm having trouble seeing a win for the general case here. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193122#comment-15193122 ] Ioannis Deligiannis commented on SPARK-13718: - That was my point above above partition replication (maybe not explained that clear). An RDD is cached and managed per-partition by Spark, but a user can only set the replication factor on the whole RDD. So, if I wanted to replicate my 'hot-partition' 10 times, I would need to replicate every RDD partition which is a waste of resources. Moving the data from a busy node once is fine (or there could be a flag to re-create RDD from another preferred location). What makes this a problem is that we continuously hit the same cached partition. Running the example, every node in the cluster will be requesting & waiting for this partition. Adding more replicas of this one partition would solve this and not waste resources. The rest "downsides" you mention are the ones I referred to above with "...product of RDD configuration and not default behavior as they would cause other issues...". Fibonacci was an example of how you can skew the CPU/"hot-partition", not an way to reproduce the issue (the reproduce example is attached). I'd be happy to provide any help I can, but my Scala skills + Spark internals are quite poor to provide a design doc or code this. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193074#comment-15193074 ] Sean Owen commented on SPARK-13718: --- (PS RDDs are already cached per-partition and can be replicated more than once.) Yes, but this has down-sides too. Copying the cached data means serializing all of the data over the network anyway, so nothing is gained there. It also putting it into memory on the other node. That might or might not be useful; depends on whether the partition is heavily accessed in the future. But it also doesn't help the original problem here, since you're still proposing copying data off the busy node. So I am not clear this particular line of reasoning is helping this case. In the Fibonacci case, data locality isn't a problem anyway. It's just skewed processing over evenly-distributed data. Something like "smart partition replication" could be useful. It raises a lot of other questions: what if all partitions are hot, how do you prioritize, where do you put the partitions, when you stop replicating, when is replicating worse than reading from local storage once, etc. That is, I think it's got to be pretty complex. If this is headed towards a detailed design doc, OK, otherwise I think this should be closed. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has enough information to make sense. > *PS2* : Debugging this
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15193046#comment-15193046 ] Ioannis Deligiannis commented on SPARK-13718: - It would be hard to impossible to automate scheduling of such tasks, however exposing scheduling APIs would make such cases easily manageable. In this case the following behavior would make things a lot better (note that these should be the product of RDD configuration and not default behavior as they would cause other issues): Potential solution: When a data-local node is CPU-bound and a cached partition is requested, that partition would be cached on the receiving node, making it available for other nodes as well, effectively increasing the data-local resources on that RDD partition (i.e. Instead of replicating RDD x2 in Memory, allow this to be on a partition basis and on-(high)-demand). What would the above solve? When you cache an RDD in memory (1 copy), you effectively reduce your data locality from HDFS n-replicas to 1. Even if the cached RDD is simply an in-memory copy of the data with little or no transformations, from the time you cache it, only one node is "data-local". Currently there is no way to efficiently deal with this. You can optimize for performance, but you can not make better utilization of you cluster resources. So, by allowing a specific "hot-partition" to be dynamically replicated across, we can effectively use our resources more efficiently. (To make an analogy, work the same way youtube replicates videos on-high demand) PS1. When referring to "hot-partition", I mean a partition that uses more resources. Think about an int RDD that you apply a filter (say x==1000) and the run a Fibonacci function. Even if your data are evenly partitioned, your processing might be skewed. PS2. I have attached a design doc that explains the problem better and a Java class that reproduces it. Should be easier to explain the issue. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15192309#comment-15192309 ] Sean Owen commented on SPARK-13718: --- What do you mean that it tries to assign without an available core (slot?). I understand that if the data-local nodes are all busy, and the task is I/O-intensive, the reading remotely is not only going to be slower but put more load on the busy nodes. But, they may not be I/O-bound, just have all slots occupied. Or the job may not be I/O-intensive at all in which case data locality doesn't help. In this case, not scheduling the task is suboptimal. But, when is it better to not schedule the task at all? you're saying it creates a straggler, but all you're saying is things take a while when resources are constrained. What is the better scheduling decision, even given omniscience? > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > Attachments: TestIssue.java, spark_struggler.jpg > > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause (Corrected as found details in code):* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes > 3. One of the nodes 'X' that accepted the task will try to access the data > from node 'H' HDD. This adds Network I/O to node and also some extra CPU for > I/O. > 4. 'X' time to complete increases ~5x as it goes over Network > 5. Eventually, every node will have a task that is waiting to fetch that > specific partition from node 'H' so cluster is basically blocked on a single > node > What I managed to figure out from the code is that this is because if an RDD > is cached, it will make use of BlockManager.getRemote() and will not > recompute the DAG part that resulted in this RDD and hence always hit the > node that has cached the RDD. > * Proposed Fix * > I have not worked with Scala & Spark source code enough to propose a code > fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, > it could make use of a new configuration e.g. > recomputeRddAfterLocalityTimeout instead of always trying to get the cached > RDD. This would be very useful if it could also be manually set on the RDD. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has enough information to make sense. > *PS2* : Debugging this issue made me realize that there can be a lot of > use-cases that trigger this behaviour -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15182902#comment-15182902 ] Ioannis Deligiannis commented on SPARK-13718: - True, tuning {{spark.locality.wait}} yields better performance and is application specific. However in essence the issue is that the scheduler tries to assign the task because it does not have an available cores (I believe). When the task is assigned to another node and that node tries to access resources on the originally 'busy' node, in most cases, it will incur longer wait times. As the data is also available in another N(Replicas-1), it has better (or equal) chances to get better results. Just to be clear, the issue is not about whether non-locality is faster or not; it is that newly scheduled tasks hit the same node that could not accept the task to begin with and creating a straggler. > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause :* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes (In some cases it will assign to NODE which means load > from HDD and then follow the sequence and fallback to ANY) > 3. One of the nodes 'X' that accepted the task will eventually try to access > the data from node 'H' HDD. This adds HDD and Network I/O to node and also > some extra CPU for I/O. > 4. 'X' time to complete increases ~5x as it involves HDD/Network > 5. Eventually, every node has a task that is waiting to fetch that specific > partition from node 'H' so cluster is basically blocked on a single node > * Proposed Fix * > I have not worked with Scala enough to propose a code fix, but on a high > level, when a task hits the 'spark.locality.wait' timeout, it should provide > a 'hint' to the node accepting the task to use as a data source 'replica' > that is not on the node that failed to accept the task in the first place. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has enough information to make sense. > *PS2* : Debugging this issue made me realize that there can be a lot of > use-cases that trigger this behaviour -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13718) Scheduler "creating" straggler node
[ https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15182901#comment-15182901 ] Ioannis Deligiannis commented on SPARK-13718: - Point taken, though I'd rank it higher than minor since it severely effect non-batch applications (Which is application terms would be considered a bug). In any case, do you think this would be better placed on the mailing list? > Scheduler "creating" straggler node > > > Key: SPARK-13718 > URL: https://issues.apache.org/jira/browse/SPARK-13718 > Project: Spark > Issue Type: Improvement > Components: Scheduler, Spark Core >Affects Versions: 1.3.1 > Environment: Spark 1.3.1 > MapR-FS > Single Rack > Standalone mode scheduling > 8 node cluster > 48 cores & 512 RAM / node > Data Replication factor of 3 > Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM. >Reporter: Ioannis Deligiannis >Priority: Minor > > *Data:* > * Assume an even distribution of data across the cluster with a replication > factor of 3. > * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 > requests can be executed concurrently(-ish) ) > *Action:* > * Action is a simple sequence of map/filter/reduce. > * The action operates upon and returns a small subset of data (following the > full map over the data). > * Data are 1 x cached serialized in memory (Kryo), so calling the action > should not hit the disk under normal conditions. > * Action network usage is low as it returns a small number of aggregated > results and does not require excessive shuffling > * Under low or moderate load, each action is expected to complete in less > than 2 seconds > *H/W Outlook* > When the action is called in high numbers, initially the cluster CPU gets > close to 100% (which is expected & intended). > After a while, the cluster utilization reduces significantly with only one > (struggler) node having 100% CPU and fully utilized network. > *Diagnosis:* > 1. Attached a profiler to the driver and executors to monitor GC or I/O > issues and everything is normal under low or heavy usage. > 2. Cluster network usage is very low > 3. No issues on Spark UI except that tasks begin to move from LOCAL to ANY > *Cause :* > 1. Node 'H' is doing marginally more work than the rest (being a little > slower and at almost 100% CPU) > 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the > task to other nodes (In some cases it will assign to NODE which means load > from HDD and then follow the sequence and fallback to ANY) > 3. One of the nodes 'X' that accepted the task will eventually try to access > the data from node 'H' HDD. This adds HDD and Network I/O to node and also > some extra CPU for I/O. > 4. 'X' time to complete increases ~5x as it involves HDD/Network > 5. Eventually, every node has a task that is waiting to fetch that specific > partition from node 'H' so cluster is basically blocked on a single node > * Proposed Fix * > I have not worked with Scala enough to propose a code fix, but on a high > level, when a task hits the 'spark.locality.wait' timeout, it should provide > a 'hint' to the node accepting the task to use as a data source 'replica' > that is not on the node that failed to accept the task in the first place. > *Workaround* > Playing with 'spark.locality.wait' values, there is a deterministic value > depending on partitions and config where the problem ceases to exist. > *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, > but I hope that this has enough information to make sense. > *PS2* : Debugging this issue made me realize that there can be a lot of > use-cases that trigger this behaviour -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org