[ 
https://issues.apache.org/jira/browse/SPARK-5140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14304510#comment-14304510
 ] 

Corey J. Nolet commented on SPARK-5140:
---------------------------------------

I think the problem is that when actions are performed on RDDs in multiple 
threads, the SparkContext on the driver that's scheduling the DAG should be 
able to see that the two RDDs depend on the same parents and synchronize them 
so that only one will run at a time, whether being cached or not (you'd assume 
the parent would be getting cached but I think this change would still affect 
cases where it hasn't been.). 

The fact that I did:

val rdd1 = input data -> transform data -> groupBy -> etc... -> cache
val rdd2 = future { rdd1.transform.groupBy.saveAsSequenceFile() }
val rdd3 = future { rdd1.transform.groupBy.saveAsSequenceFile() }

Has unexpected results when I find that rdd1 was assigned an id and run 
completely separately for rdd2 and rdd3. I would have expected, whether cached 
or not, that when run in separate threads, rdd1 would have been assigned an id, 
then rdd2 would have caused it to begin running through its stages, and rdd3 
would have paused because it's waiting on rdd1's id to complete its stages. 
What  I see is that, after rdd2 and rdd3 both run concurrently calculating 
rdd1, the storage for rdd1 = 200% cached. It causes issues when I have 50 or so 
rdds calling saveAsSequenceFile() that all have different shared dependencies 
on parent rdds (which may not always be known at creation time without 
introspecting them in my own tree). 

Now i've basically got to the scheduling myself- I've got to determine what 
depends on what and run things concurrently myself. It seems like the DAG 
should know this already and be able to make use of it.

> Two RDDs which are scheduled concurrently should be able to wait on parent in 
> all cases
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-5140
>                 URL: https://issues.apache.org/jira/browse/SPARK-5140
>             Project: Spark
>          Issue Type: New Feature
>            Reporter: Corey J. Nolet
>              Labels: features
>
> Not sure if this would change too much of the internals to be included in the 
> 1.2.1 but it would be very helpful if it could be.
> This ticket is from a discussion between myself and [~ilikerps]. Here's the 
> result of some testing that [~ilikerps] did:
> bq. I did some testing as well, and it turns out the "wait for other guy to 
> finish caching" logic is on a per-task basis, and it only works on tasks that 
> happen to be executing on the same machine. 
> bq. Once a partition is cached, we will schedule tasks that touch that 
> partition on that executor. The problem here, though, is that the cache is in 
> progress, and so the tasks are still scheduled randomly (or with whatever 
> locality the data source has), so tasks which end up on different machines 
> will not see that the cache is already in progress.
> {code}
> Here was my test, by the way:
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent._
> import scala.concurrent.duration._
> val rdd = sc.parallelize(0 until 8).map(i => { Thread.sleep(10000); i 
> }).cache()
> val futures = (0 until 4).map { _ => Future { rdd.count } }
> Await.result(Future.sequence(futures), 120.second)
> {code}
> bq. Note that I run the future 4 times in parallel. I found that the first 
> run has all tasks take 10 seconds. The second has about 50% of its tasks take 
> 10 seconds, and the rest just wait for the first stage to finish. The last 
> two runs have no tasks that take 10 seconds; all wait for the first two 
> stages to finish.
> What we want is the ability to fire off a job and have the DAG figure out 
> that two RDDs depend on the same parent so that when the children are 
> scheduled concurrently, the first one to start will activate the parent and 
> both will wait on the parent. When the parent is done, they will both be able 
> to finish their work concurrently. We are trying to use this pattern by 
> having the parent cache results.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to