[jira] [Commented] (SPARK-3105) Calling cache() after RDDs are pipelined has no effect in PySpark

2014-10-02 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14156936#comment-14156936
 ] 

Nicholas Chammas commented on SPARK-3105:
-

I think it's definitely important for the larger project that the 3 APIs 
(Scala, Java, and Python) have semantics that are as consistent as possible. 
And for what it's worth, the Scala/Java semantics in this case seem nicer. 

People learn about the DAG as a distinguishing feature of Spark, so it might 
seem strange in PySpark that caching an RDD earlier in a lineage confers no 
benefit on descendent RDDs. Whether the descendent RDDs were defined before or 
after the caching seems like something people shouldn't have to think about.

It sounds like this is a non-trivial change to make, and I don't appreciate the 
other implications it might have, but it seems like a good thing to me.

> Calling cache() after RDDs are pipelined has no effect in PySpark
> -
>
> Key: SPARK-3105
> URL: https://issues.apache.org/jira/browse/SPARK-3105
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Josh Rosen
>Assignee: Josh Rosen
>
> PySpark's PipelinedRDD decides whether to pipeline transformations by 
> checking whether those transformations are pipelinable _at the time that the 
> PipelinedRDD objects are created_ rather than at the time that we invoke 
> actions.  This might lead to problems if we call {{cache()}} on an RDD after 
> it's already been used in a pipeline:
> {code}
> rdd1 = sc.parallelize(range(100)).map(lambda x: x)
> rdd2 = rdd1.map(lambda x: 2 * x)
> rdd1.cache()
> rdd2.collect()
> {code}
> When I run this code, I'd expect {cache()}} to break the pipeline and cache 
> intermediate results, but instead the two transformations are pipelined 
> together in Python, effectively ignoring the {{cache()}}.
> Note that {{cache()}} works properly if we call it before performing any 
> other transformations on the RDD:
> {code}
> rdd1 = sc.parallelize(range(100)).map(lambda x: x).cache()
> rdd2 = rdd1.map(lambda x: 2 * x)
> rdd2.collect()
> {code}
> This works as expected and caches {{rdd1}}.
> To fix this, I think we dynamically decide whether to pipeline when we 
> actually perform actions, rather than statically deciding when we create the 
> RDDs.
> We should also add tests for this.
> (Thanks to [~tdas] for pointing out this issue.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3105) Calling cache() after RDDs are pipelined has no effect in PySpark

2014-10-01 Thread Josh Rosen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14156088#comment-14156088
 ] 

Josh Rosen commented on SPARK-3105:
---

I discussed this with [~tdas] and [~davies] today.

As it stands now, there's a small discrepancy between the Scala and Python 
semantics for cache() and persist().  In Scala, calling cache() or persist() 
both returns an RDD _and_ changes the persistence of the RDD instance it was 
called on, so running

{code}
val a = sc.parallelize(...).map()
val b = a.map(...)
a.count()
b.count()

a.cache()
a.count()
b.count()
{code}

will result in {{b.count()}} using a cached copy of {{a}}.

In Python, as described above, changing the persistence level of an RDD will 
not automatically cause that persistence change to be reflected in existing 
RDDs that were generated by transforming the original non-persisted RDD.

In all languages, calling cache() or persist() and performing subsequent 
transformations on the RDDs returned from cache() / persist() will work as 
expected.  

One scenario where the Python semantics might be annoying is in an IPython 
notebook.  Say that a user defines some base RDDs in cell #1, some transformed 
RDDs in cell #2, then performs actions in cell #3, calls cache() in cell #4, 
then goes back and re-runs cell #3.  In this case, the cache() won't have any 
effect.

On the one hand, I suppose we could argue that it's best to keep the semantics 
as close as possible across all languages.

Unfortunately, this would require us to make some large internal changes to 
PySpark in order to implement the Scala/Java semantics.  If we decide that this 
is worth pursuing, I can post a more detailed proposal outlining the necessary 
changes (the "tl;dr" of my proposed approach is "establish a one-to-one mapping 
between JavaRDDs and PySpark RDDs and defer the pipelining of Python functions 
to execution time").

In my interpretation, the Spark Programming Guide [seems to 
suggest|https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence]
 that persist() and cache() should modify the instance that they're called on 
rather than returning a new, different RDD which happens to be persisted:

{quote}
You can mark an RDD to be persisted using the persist() or cache() methods on 
it. 
{quote}

"Marking" an RDD sounds like it modifies the metadata of that RDD rather than 
returning a new one (maybe I'm reading too much into this, though).

Does anyone have strong opinions on whether we should change the PySpark 
semantics to match Scala's, or examples of real use-cases where PySpark's 
current cache() semantics are confusing?

> Calling cache() after RDDs are pipelined has no effect in PySpark
> -
>
> Key: SPARK-3105
> URL: https://issues.apache.org/jira/browse/SPARK-3105
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark
>Affects Versions: 1.0.0, 1.1.0
>Reporter: Josh Rosen
>Assignee: Josh Rosen
>
> PySpark's PipelinedRDD decides whether to pipeline transformations by 
> checking whether those transformations are pipelinable _at the time that the 
> PipelinedRDD objects are created_ rather than at the time that we invoke 
> actions.  This might lead to problems if we call {{cache()}} on an RDD after 
> it's already been used in a pipeline:
> {code}
> rdd1 = sc.parallelize(range(100)).map(lambda x: x)
> rdd2 = rdd1.map(lambda x: 2 * x)
> rdd1.cache()
> rdd2.collect()
> {code}
> When I run this code, I'd expect {cache()}} to break the pipeline and cache 
> intermediate results, but instead the two transformations are pipelined 
> together in Python, effectively ignoring the {{cache()}}.
> Note that {{cache()}} works properly if we call it before performing any 
> other transformations on the RDD:
> {code}
> rdd1 = sc.parallelize(range(100)).map(lambda x: x).cache()
> rdd2 = rdd1.map(lambda x: 2 * x)
> rdd2.collect()
> {code}
> This works as expected and caches {{rdd1}}.
> To fix this, I think we dynamically decide whether to pipeline when we 
> actually perform actions, rather than statically deciding when we create the 
> RDDs.
> We should also add tests for this.
> (Thanks to [~tdas] for pointing out this issue.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org