[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ioannis Deligiannis updated SPARK-13718:
----------------------------------------
    Description: 
*Data:*
* Assume an even distribution of data across the cluster with a replication 
factor of 3.
* In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
requests can be executed concurrently(-ish) )

*Action:*
* Action is a simple sequence of map/filter/reduce. 
* The action operates upon and returns a small subset of data (following the 
full map over the data).
* Data are 1 x cached serialized in memory (Kryo), so calling the action  
should not hit the disk under normal conditions.
* Action network usage is low as it returns a small number of aggregated 
results and does not require excessive shuffling
* Under low or moderate load, each action is expected to complete in less than 
2 seconds

*H/W Outlook*
When the action is called in high numbers, initially the cluster CPU gets close 
to 100% (which is expected & intended). 
After a while, the cluster utilization reduces significantly with only one 
(struggler) node having 100% CPU and fully utilized network.

*Diagnosis:*
1. Attached a profiler to the driver and executors to monitor GC or I/O issues 
and everything is normal under low or heavy usage. 
2. Cluster network usage is very low
3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY

*Cause (Corrected as found details in code):* 
1. Node 'H' is doing marginally more work than the rest (being a little slower 
and at almost 100% CPU)
2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
task to other nodes 
3. One of the nodes 'X' that accepted the task will try to access the data from 
node 'H' HDD. This adds Network I/O to node and also some extra CPU for I/O.
4. 'X' time to complete increases ~5x as it goes over Network 
5. Eventually, every node will have a task that is waiting to fetch that 
specific partition from node 'H' so cluster is basically blocked on a single 
node

What I managed to figure out from the code is that this is because if an RDD is 
cached, it will make use of BlockManager.getRemote() and will not recompute the 
DAG part that resulted in this RDD and hence always hit the node that has 
cached the RDD.

* Proposed Fix *
I have not worked with Scala & Spark source code enough to propose a code fix, 
but on a high level, when a task hits the 'spark.locality.wait' timeout, it 
could make use of a new configuration e.g. recomputeRddAfterLocalityTimeout 
instead of always trying to get the cached RDD. This would be very useful if it 
could also be manually set on the RDD.

*Workaround*
Playing with 'spark.locality.wait' values, there is a deterministic value 
depending on partitions and config where the problem ceases to exist.

*PS1* : Don't have enough Scala skils to follow the issue or propose a fix, but 
I hope that this has enough information to make sense.
*PS2* : Debugging this issue made me realize that there can be a lot of 
use-cases that trigger this behaviour


  was:
*Data:*
* Assume an even distribution of data across the cluster with a replication 
factor of 3.
* In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
requests can be executed concurrently(-ish) )

*Action:*
* Action is a simple sequence of map/filter/reduce. 
* The action operates upon and returns a small subset of data (following the 
full map over the data).
* Data are 1 x cached serialized in memory (Kryo), so calling the action  
should not hit the disk under normal conditions.
* Action network usage is low as it returns a small number of aggregated 
results and does not require excessive shuffling
* Under low or moderate load, each action is expected to complete in less than 
2 seconds

*H/W Outlook*
When the action is called in high numbers, initially the cluster CPU gets close 
to 100% (which is expected & intended). 
After a while, the cluster utilization reduces significantly with only one 
(struggler) node having 100% CPU and fully utilized network.

*Diagnosis:*
1. Attached a profiler to the driver and executors to monitor GC or I/O issues 
and everything is normal under low or heavy usage. 
2. Cluster network usage is very low
3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY

*Cause (Corrected as found details in code):* 
1. Node 'H' is doing marginally more work than the rest (being a little slower 
and at almost 100% CPU)
2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
task to other nodes 
3. One of the nodes 'X' that accepted the task will try to access the data from 
node 'H' HDD. This adds Network I/O to node and also some extra CPU for I/O.
4. 'X' time to complete increases ~5x as it goes over Network 
5. Eventually, every node will have a task that is waiting to fetch that 
specific partition from node 'H' so cluster is basically blocked on a single 
node

IMO this is caused because when a partition is cached, the preferred location 
is always set to that node. This makes the *false* assumption that network is 
always faster than HDD and in many cases end up creating "stragglers".  
See extract from 
org.apache.spark.scheduler.DAGScheduler.getPreferredLocsInternal(...)
{noformat}
    // If the partition is cached, return the cache locations
    val cached = getCacheLocs(rdd)(partition)
    if (cached.nonEmpty) {
      return cached
    }
    // If the RDD has some placement preferences (as is the case for input 
RDDs), get those
    val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
    if (rddPrefs.nonEmpty) {
      return rddPrefs.map(TaskLocation(_))
    }
{noformat}

* Proposed Fix *
I have not worked with Scala & Spark source code enough to propose a code fix, 
but on a high level, when a task hits the 'spark.locality.wait' timeout, it 
could either provide a configuration e.g. usePrefferedLocationOnLocalityTimeout 
that can change the behavior and fallback to `preferredLocations` instead of 
always using the cached. 

*Workaround*
Playing with 'spark.locality.wait' values, there is a deterministic value 
depending on partitions and config where the problem ceases to exist.

*PS1* : Don't have enough Scala skils to follow the issue or propose a fix, but 
I hope that this has enough information to make sense.
*PS2* : Debugging this issue made me realize that there can be a lot of 
use-cases that trigger this behaviour



> Scheduler "creating" straggler node 
> ------------------------------------
>
>                 Key: SPARK-13718
>                 URL: https://issues.apache.org/jira/browse/SPARK-13718
>             Project: Spark
>          Issue Type: Improvement
>          Components: Scheduler, Spark Core
>    Affects Versions: 1.3.1
>         Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>            Reporter: Ioannis Deligiannis
>            Priority: Minor
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to