[ 
https://issues.apache.org/jira/browse/SPARK-13718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15193378#comment-15193378
 ] 

Ioannis Deligiannis commented on SPARK-13718:
---------------------------------------------

(reply per paragraph)

To provide a bit more context, as I am not talking about point-lookups. Assume 
that you want your users to adhoc analyze DNA and say your RDD contains one 
record per mammal (e.g. Dinosaurs, butterfly, worm). If you have an interactive 
API (e.g. Spark SQL, REST Based or whatever), most users "queries" will focus 
on Dinosaurs. Say Dinosaur DNA record (cached) is say 10GBs. The "query" can be 
loosely written as mammalRDD.filter(id=="Dinosaurs").map(processDNA).collect();
So what will Spark scheduler do? Well, it will assign an executor to process 
each and every RDD, however the executor processing the Dinosaur record will 
take 10 minutes where the executors working on the other records will finish in 
milliseconds. Because 'Dinosaur' is popular subject 95% of your user queries 
will be scheduled to that one node that has the cached partition. So the next 
user will use an executor core for 10-min and so on until all that executor 
that has the cached data cannot accept any new tasks. The scheduler will then 
allocate a remote executor which is going to request 10GBs from the one that 
contains the cached partition. You can see at this point that once the 
scheduler starts assigning tasks to other executors your cluster goes in a 
"dark place" where it is performing horribly primarily because it does not use 
it's resources efficiently. 

Eventually, all of the RDD is required, but 1x replication is fine. What would 
work is -only- the hot partition to be replicated more than once. 

Agreed, but here I was making the case for both normal caching and let's call 
it "hot-replication". The caching algorithms are related and could be shared.

As I tried to illustrate with the example above, there are natural use-cases 
that make use of Spark both for querying but also for distributed processing. I 
believe that this will progress as it affects use-cases many of which are 
affecting financial institutions. If Spark is to be a leader in interactive big 
data processing (User driven requests) then this will have to be addressed. If 
you have a look into SO or even in user@, there is a growing number of people 
trying to this. Also in SO I have noticed a few cases where I am pretty sure 
that the underlying factor is this limitation.

You are right, I keep talking about imbalanced task processing times and 
linking it with data locality because (cache) locality is the main cause. If I 
do not cache my RDD, Spark scheduler will use the `preferred locations` to 
schedule a task, whereas when you cache it will only use the cached partition 
owner as the location to fetch. My intention is not to make something slow 
faster;I have a workaround for this. My ultimate goal here is the ability to 
utilize my cluster resources in a user-driven interactive environment. Since 
user requests are (almost) always skewed(or closer to a normal distribution) 
this issue is bound to happen. If Spark was only meant to be a batch 
environment, then this would probably have little effect.

> Scheduler "creating" straggler node 
> ------------------------------------
>
>                 Key: SPARK-13718
>                 URL: https://issues.apache.org/jira/browse/SPARK-13718
>             Project: Spark
>          Issue Type: Improvement
>          Components: Scheduler, Spark Core
>    Affects Versions: 1.3.1
>         Environment: Spark 1.3.1
> MapR-FS
> Single Rack
> Standalone mode scheduling
> 8 node cluster
> 48 cores & 512 RAM / node
> Data Replication factor of 3
> Each Node has 4 Spark executors configured with 12 cores each and 22GB of RAM.
>            Reporter: Ioannis Deligiannis
>            Priority: Minor
>         Attachments: TestIssue.java, spark_struggler.jpg
>
>
> *Data:*
> * Assume an even distribution of data across the cluster with a replication 
> factor of 3.
> * In-memory data are partitioned in 128 chunks (384 cores in total, i.e. 3 
> requests can be executed concurrently(-ish) )
> *Action:*
> * Action is a simple sequence of map/filter/reduce. 
> * The action operates upon and returns a small subset of data (following the 
> full map over the data).
> * Data are 1 x cached serialized in memory (Kryo), so calling the action  
> should not hit the disk under normal conditions.
> * Action network usage is low as it returns a small number of aggregated 
> results and does not require excessive shuffling
> * Under low or moderate load, each action is expected to complete in less 
> than 2 seconds
> *H/W Outlook*
> When the action is called in high numbers, initially the cluster CPU gets 
> close to 100% (which is expected & intended). 
> After a while, the cluster utilization reduces significantly with only one 
> (struggler) node having 100% CPU and fully utilized network.
> *Diagnosis:*
> 1. Attached a profiler to the driver and executors to monitor GC or I/O 
> issues and everything is normal under low or heavy usage. 
> 2. Cluster network usage is very low
> 3. No issues on Spark UI except that tasks begin to  move from LOCAL to ANY
> *Cause (Corrected as found details in code):* 
> 1. Node 'H' is doing marginally more work than the rest (being a little 
> slower and at almost 100% CPU)
> 2. Scheduler hits the default 3000 millis spark.locality.wait and assigns the 
> task to other nodes 
> 3. One of the nodes 'X' that accepted the task will try to access the data 
> from node 'H' HDD. This adds Network I/O to node and also some extra CPU for 
> I/O.
> 4. 'X' time to complete increases ~5x as it goes over Network 
> 5. Eventually, every node will have a task that is waiting to fetch that 
> specific partition from node 'H' so cluster is basically blocked on a single 
> node
> What I managed to figure out from the code is that this is because if an RDD 
> is cached, it will make use of BlockManager.getRemote() and will not 
> recompute the DAG part that resulted in this RDD and hence always hit the 
> node that has cached the RDD.
> * Proposed Fix *
> I have not worked with Scala & Spark source code enough to propose a code 
> fix, but on a high level, when a task hits the 'spark.locality.wait' timeout, 
> it could make use of a new configuration e.g. 
> recomputeRddAfterLocalityTimeout instead of always trying to get the cached 
> RDD. This would be very useful if it could also be manually set on the RDD.
> *Workaround*
> Playing with 'spark.locality.wait' values, there is a deterministic value 
> depending on partitions and config where the problem ceases to exist.
> *PS1* : Don't have enough Scala skils to follow the issue or propose a fix, 
> but I hope that this has enough information to make sense.
> *PS2* : Debugging this issue made me realize that there can be a lot of 
> use-cases that trigger this behaviour



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to