[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193046#comment-15193046
 ] 

Ioannis Deligiannis commented on SPARK-13718:
---------------------------------------------

It would be hard to impossible to automate scheduling of such tasks, however 
exposing scheduling APIs would make such cases easily manageable. In this case 
the following behavior would make things a lot better (note that these should 
be the product of RDD configuration and not default behavior as they would 
cause other issues):

Potential solution:
When a data-local node is CPU-bound and a cached partition is requested, that 
partition would be cached on the receiving node, making it available for other 
nodes as well, effectively increasing the data-local resources on that RDD 
partition (i.e. Instead of replicating RDD x2 in Memory, allow this to be on a 
partition basis and on-(high)-demand).

What would the above solve?
When you cache an RDD in memory (1 copy), you effectively reduce your data 
locality from HDFS n-replicas to 1. Even if the cached RDD is simply an 
in-memory copy of the data with little or no transformations, from the time you 
cache it, only one node is "data-local". Currently there is no way to 
efficiently deal with this. You can optimize for performance, but you can not 
make better utilization of you cluster resources. So, by allowing a specific 
"hot-partition" to be dynamically replicated across, we can effectively use our 
resources more efficiently. (To make an analogy, work the same way youtube 
replicates videos on-high demand)

PS1. When referring to "hot-partition", I mean a partition that uses more 
resources. Think about an int RDD that you apply a filter (say x==1000) and the 
run a Fibonacci function. Even if your data are evenly partitioned, your 
processing might be skewed.

PS2. I have attached a design doc that explains the problem better and a Java 
class that reproduces it. Should be easier to explain the issue.

> Scheduler "creating" straggler node 
> ------------------------------------
>
>                 Key: SPARK-13718
>                 URL: https://issues.apache.org/jira/browse/SPARK-13718
>             Project: Spark
>          Issue Type: Improvement
>          Components: Scheduler, Spark Core
>    Affects Versions: 1.3.1
>         Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>            Reporter: Ioannis Deligiannis
>            Priority: Minor
>         Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to