[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2014-09-22 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142982#comment-14142982
 ] 

Patrick Wendell edited comment on SPARK-3622 at 9/22/14 7:35 AM:
-

In Spark most RDD operations are lazy, so in the example you gave, there is no 
guarantee that rddA and rddB will be computed at the same time. In fact in this 
case they definitely won't be (weirdly sortyByKey is not lazy in Spark, unlike 
almost all transformations).

I'm trying to think if there is a case where you can know ahead of time that 
the derivative RDD's (B and C) are definitely going to be computed at the same 
time. It would be interesting to see if two RDD's could be jointly computed 
in that case.

Right now the physical execution is pretty closely tied to the idea of 
computing a single RDD at one time, so not sure if it's really possible - this 
is probably different than Hive where logical plans are more decoupled from 
physical execution.

A case where we could exploit this is something like:

{code}
val a = sc.parallelize(1 to 1000, 100)
val b = a.filter(...)
val c = a.filter(...)
val d = a.filter(...)
b.union(c).count()
{code}

In the above example currently it will submit two stages at the same time that 
will both compute a. But maybe we could do something smarter. If we can detect 
this we might be able to expose something where users can e.g. run multiple 
operations on the same underlying RDD and it will work efficiently provided 
those get computed at the same time.


was (Author: pwendell):
In Spark most RDD operations are lazy, so in the example you gave, there is no 
guarantee that rddA and rddB will be computed at the same time. In fact in this 
case they definitely won't be (weirdly sortyByKey is not lazy in Spark, unlike 
almost all transformations).

I'm trying to think if there is a case where you can know ahead of time that 
the derivative RDD's (B and C) are definitely going to be computed at the same 
time. It would be interesting to see if two RDD's could be jointly computed 
in that case.

Right now the physical execution is pretty closely tied to the idea of 
computing a single RDD at one time, so not sure if it's really possible - this 
is probably different than Hive where logical plans are more decoupled from 
physical execution.

A case where we could exploit this is something like:

{code}
val a = sc.parallelize(1 to 1000, 100)
val b = a.filter(...)
val c = a.filter(...)
val d = a.filter(...)
b.union(c).count()
{code}

In the above example currently it will submit two stages at the same time that 
will both compute a. But maybe we could do something smarter.

 Provide a custom transformation that can output multiple RDDs
 -

 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 All existing transformations return just one RDD at most, even for those 
 which takes user-supplied functions such as mapPartitions() . However, 
 sometimes a user provided function may need to output multiple RDDs. For 
 instance, a filter function that divides the input RDD into serveral RDDs. 
 While it's possible to get multiple RDDs by transforming the same RDD 
 multiple times, it may be more efficient to do this concurrently in one shot. 
 Especially user's existing function is already generating different data sets.
 This the case in Hive on Spark, where Hive's map function and reduce function 
 can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2014-09-22 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142982#comment-14142982
 ] 

Patrick Wendell edited comment on SPARK-3622 at 9/22/14 7:35 AM:
-

In Spark most RDD operations are lazy, so in the example you gave, there is no 
guarantee that rddA and rddB will be computed at the same time. In fact in this 
case they definitely won't be (weirdly sortyByKey is not lazy in Spark, unlike 
almost all transformations).

I'm trying to think if there is a case where you can know ahead of time that 
the derivative RDD's (B and C) are definitely going to be computed at the same 
time. It would be interesting to see if two RDD's could be jointly computed 
in that case.

Right now the physical execution is pretty closely tied to the idea of 
computing a single RDD at one time, so not sure if it's really possible - this 
is probably different than Hive where logical plans are more decoupled from 
physical execution.

A case where we could exploit this is something like:

{code}
val a = sc.parallelize(1 to 1000, 100)
val b = a.filter(...)
val c = a.filter(...)
val d = a.filter(...)
b.union(c).count()
{code}

In the above example currently it will submit two stages at the same time that 
will both compute a. But maybe we could do something smarter. If we can detect 
this we might be able to expose something where users can e.g. run multiple 
operations on the same underlying RDD and it will work efficiently provided 
those get computed at the same time.

I'm not really sure that this is possible, but I also can't prove it's 
impossible, so that's good!


was (Author: pwendell):
In Spark most RDD operations are lazy, so in the example you gave, there is no 
guarantee that rddA and rddB will be computed at the same time. In fact in this 
case they definitely won't be (weirdly sortyByKey is not lazy in Spark, unlike 
almost all transformations).

I'm trying to think if there is a case where you can know ahead of time that 
the derivative RDD's (B and C) are definitely going to be computed at the same 
time. It would be interesting to see if two RDD's could be jointly computed 
in that case.

Right now the physical execution is pretty closely tied to the idea of 
computing a single RDD at one time, so not sure if it's really possible - this 
is probably different than Hive where logical plans are more decoupled from 
physical execution.

A case where we could exploit this is something like:

{code}
val a = sc.parallelize(1 to 1000, 100)
val b = a.filter(...)
val c = a.filter(...)
val d = a.filter(...)
b.union(c).count()
{code}

In the above example currently it will submit two stages at the same time that 
will both compute a. But maybe we could do something smarter. If we can detect 
this we might be able to expose something where users can e.g. run multiple 
operations on the same underlying RDD and it will work efficiently provided 
those get computed at the same time.

 Provide a custom transformation that can output multiple RDDs
 -

 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 All existing transformations return just one RDD at most, even for those 
 which takes user-supplied functions such as mapPartitions() . However, 
 sometimes a user provided function may need to output multiple RDDs. For 
 instance, a filter function that divides the input RDD into serveral RDDs. 
 While it's possible to get multiple RDDs by transforming the same RDD 
 multiple times, it may be more efficient to do this concurrently in one shot. 
 Especially user's existing function is already generating different data sets.
 This the case in Hive on Spark, where Hive's map function and reduce function 
 can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2014-09-21 Thread Patrick Wendell (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142865#comment-14142865
 ] 

Patrick Wendell edited comment on SPARK-3622 at 9/22/14 3:24 AM:
-

Do you mind clarifying a little bit how hive would use this (maybe with a code 
example)?

Let's say you had a transformation that went from a single RDD A to two RDD's B 
and C. The normal way to do this if you want to avoid recomputing A would be to 
persist it, then use it to derive both B and C (this will do multiple passes on 
A, but it won't fully recompute A twice).

I think that doing this in the general case is not possible by definition. The 
user might use B and C at different times, so it's not possible to guarantee 
that A will be computed only once unless you persist A.


was (Author: pwendell):
Do you mind clarifying a little bit how hive would use this (maybe with a code 
example)? The normal way to do this if you want to avoid recomputing A would be 
to persist it, then use it to derive both B and C (this will do multiple passes 
on A, but it won't fully recompute A twice).

I think that doing this in the general case is not possible by definition. 
Let's say you had a transformation that went from a single RDD A to two RDD's B 
and C. The user might use B and C at different times, so it's not possible to 
guarantee that A will be computed only once unless you persist A.

 Provide a custom transformation that can output multiple RDDs
 -

 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 All existing transformations return just one RDD at most, even for those 
 which takes user-supplied functions such as mapPartitions() . However, 
 sometimes a user provided function may need to output multiple RDDs. For 
 instance, a filter function that divides the input RDD into serveral RDDs. 
 While it's possible to get multiple RDDs by transforming the same RDD 
 multiple times, it may be more efficient to do this concurrently in one shot. 
 Especially user's existing function is already generating different data sets.
 This the case in Hive on Spark, where Hive's map function and reduce function 
 can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-3622) Provide a custom transformation that can output multiple RDDs

2014-09-21 Thread Xuefu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14142881#comment-14142881
 ] 

Xuefu Zhang edited comment on SPARK-3622 at 9/22/14 4:39 AM:
-

Thanks for your comments, [~pwendell]. I understand caching A would be helpful 
if I need to transform it to get B and C separately. My proposal is to get B 
and C just by one pass of A, so A doens't even need to be cached.

Here is an example how it may be used in Hive.
{code}
JavaPairRDD table = sparkContext.hadoopRDD(..);
Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction);
JavaPairRDD rddA = mapperRDDs.get(A);
JavaPairRDD rddB = mapperRDDs.get(B);
JavaPairRDD sortedRddA = rddA.sortByKey();
javaPairRDD groupedRddB = rddB.groupByKey();
// further processing sortedRddA and groupedRddB.
...
{code}
In this case, mapFunction can return named iterators for A and B. B is 
automatically computed whenever A is computed, and vice versa. Since both are 
computed if any of them computed, subsequent reference to either one should not 
recompute any of them.

The benefits of it: 1) no need to cache A; 2) only one pass of the input.

I'm not sure if this is possible feasible in Spark, but Hive's map function is 
exactly doing this. It's operator tree can branch off anywhere, resulting 
multiple output datasets from a single input dataset.

Please let me know if there are more questions.



was (Author: xuefuz):
Thanks for your comments, [~pwendell]. I understand caching A would be helpful 
if I need to transform it to get B and C separately. My proposal is to get B 
and C just by one pass of A, so A doens't even need to be cached.

Here is an example how it may be used in Hive.
{code}
JavaPairRDD table = sparkContext.hadoopRDD(..);
Mapname, JavaPairRDD mappedRDDs = table.mapPartitions(mapFunction);
JavaPairRDD rddA = mapperRDDs.get(A);
JavaPairRDD rddB = mapperRDDs.get(A);
JavaPairRDD sortedRddA = rddA.sortByKey();
javaPairRDD groupedRddB = rddB.groupByKey();
// further processing sortedRddA and groupedRddB.
...
{code}
In this case, mapFunction can return named iterators for A and B. B is 
automatically computed whenever A is computed, and vice versa. Since both are 
computed if any of them computed, subsequent reference to either one should not 
recompute any of them.

The benefits of it: 1) no need to cache A; 2) only one pass of the input.

I'm not sure if this is possible feasible in Spark, but Hive's map function is 
exactly doing this. It's operator tree can branch off anywhere, resulting 
multiple output datasets from a single input dataset.

Please let me know if there are more questions.


 Provide a custom transformation that can output multiple RDDs
 -

 Key: SPARK-3622
 URL: https://issues.apache.org/jira/browse/SPARK-3622
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 1.1.0
Reporter: Xuefu Zhang

 All existing transformations return just one RDD at most, even for those 
 which takes user-supplied functions such as mapPartitions() . However, 
 sometimes a user provided function may need to output multiple RDDs. For 
 instance, a filter function that divides the input RDD into serveral RDDs. 
 While it's possible to get multiple RDDs by transforming the same RDD 
 multiple times, it may be more efficient to do this concurrently in one shot. 
 Especially user's existing function is already generating different data sets.
 This the case in Hive on Spark, where Hive's map function and reduce function 
 can output different data sets to be consumed by subsequent stages.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org