[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join

2015-11-14 Thread Zhan Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005682#comment-15005682
 ] 

Zhan Zhang commented on SPARK-11704:


[~maropu] You are right. I mean fetching from network is a big overhead. Feel 
free to work on it.

> Optimize the Cartesian Join
> ---
>
> Key: SPARK-11704
> URL: https://issues.apache.org/jira/browse/SPARK-11704
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Zhan Zhang
>
> Currently CartesianProduct relies on RDD.cartesian, in which the computation 
> is realized as follows
>   override def compute(split: Partition, context: TaskContext): Iterator[(T, 
> U)] = {
> val currSplit = split.asInstanceOf[CartesianPartition]
> for (x <- rdd1.iterator(currSplit.s1, context);
>  y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
>   }
> From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. 
> Which is really heavy and may never finished if n is large, especially when 
> rdd2 is coming from ShuffleRDD.
> We should have some optimization on CartesianProduct by caching rightResults. 
> The problem is that we don’t have cleanup hook to unpersist rightResults 
> AFAIK. I think we should have some cleanup hook after query execution.
> With the hook available, we can easily optimize such Cartesian join. I 
> believe such cleanup hook may also benefit other query optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join

2015-11-13 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005111#comment-15005111
 ] 

Takeshi Yamamuro commented on SPARK-11704:
--

ISTM that some earlier stages in rdd2 are skipped in all the iterations except 
the first one in case of rdd2 comming from ShuffleRDD.
That said, it is worth doing this optimization.

> Optimize the Cartesian Join
> ---
>
> Key: SPARK-11704
> URL: https://issues.apache.org/jira/browse/SPARK-11704
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Zhan Zhang
>
> Currently CartesianProduct relies on RDD.cartesian, in which the computation 
> is realized as follows
>   override def compute(split: Partition, context: TaskContext): Iterator[(T, 
> U)] = {
> val currSplit = split.asInstanceOf[CartesianPartition]
> for (x <- rdd1.iterator(currSplit.s1, context);
>  y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
>   }
> From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. 
> Which is really heavy and may never finished if n is large, especially when 
> rdd2 is coming from ShuffleRDD.
> We should have some optimization on CartesianProduct by caching rightResults. 
> The problem is that we don’t have cleanup hook to unpersist rightResults 
> AFAIK. I think we should have some cleanup hook after query execution.
> With the hook available, we can easily optimize such Cartesian join. I 
> believe such cleanup hook may also benefit other query optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join

2015-11-13 Thread Zhan Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005134#comment-15005134
 ] 

Zhan Zhang commented on SPARK-11704:


[~maropu] Maybe I misunderstand. If RDD2 is coming from ShuffleRDD, each new 
iterator will try to fetch from network because RDD2 is not cached. Is the 
ShuffleRDD cached automatically?

> Optimize the Cartesian Join
> ---
>
> Key: SPARK-11704
> URL: https://issues.apache.org/jira/browse/SPARK-11704
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Zhan Zhang
>
> Currently CartesianProduct relies on RDD.cartesian, in which the computation 
> is realized as follows
>   override def compute(split: Partition, context: TaskContext): Iterator[(T, 
> U)] = {
> val currSplit = split.asInstanceOf[CartesianPartition]
> for (x <- rdd1.iterator(currSplit.s1, context);
>  y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
>   }
> From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. 
> Which is really heavy and may never finished if n is large, especially when 
> rdd2 is coming from ShuffleRDD.
> We should have some optimization on CartesianProduct by caching rightResults. 
> The problem is that we don’t have cleanup hook to unpersist rightResults 
> AFAIK. I think we should have some cleanup hook after query execution.
> With the hook available, we can easily optimize such Cartesian join. I 
> believe such cleanup hook may also benefit other query optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join

2015-11-13 Thread Zhan Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005136#comment-15005136
 ] 

Zhan Zhang commented on SPARK-11704:


I think we can add a cleanup hook in SQLContext, and when the query is done, we 
invoke all the cleanup hook registered. By this way, the CartesianProduct we 
can cache the rightResult and register the cleanup handler(unpersist). By this 
way, we avoid the recomputation of RDD2. 

In my testing, because rdd2 is quite small, I actually reverse the cartesian 
join by reverting cartesian(rdd1, rdd2) to cartesian(rdd2, rdd1). By this way, 
the computation is done quite fast, but the original form cannot be finished.

> Optimize the Cartesian Join
> ---
>
> Key: SPARK-11704
> URL: https://issues.apache.org/jira/browse/SPARK-11704
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Zhan Zhang
>
> Currently CartesianProduct relies on RDD.cartesian, in which the computation 
> is realized as follows
>   override def compute(split: Partition, context: TaskContext): Iterator[(T, 
> U)] = {
> val currSplit = split.asInstanceOf[CartesianPartition]
> for (x <- rdd1.iterator(currSplit.s1, context);
>  y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
>   }
> From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. 
> Which is really heavy and may never finished if n is large, especially when 
> rdd2 is coming from ShuffleRDD.
> We should have some optimization on CartesianProduct by caching rightResults. 
> The problem is that we don’t have cleanup hook to unpersist rightResults 
> AFAIK. I think we should have some cleanup hook after query execution.
> With the hook available, we can easily optimize such Cartesian join. I 
> believe such cleanup hook may also benefit other query optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join

2015-11-13 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005217#comment-15005217
 ] 

Takeshi Yamamuro commented on SPARK-11704:
--

You're right; they're not automatically cached.
I just say that earlier stages in rdd2 are skipped and the iterator just 
fetches blocks from remote BlockManager (the blocks are written in ShuffleRDD). 
You mean that fetching remote blocks is too slow?

Anyway, adding cleanup hook can make a big impact on SparkPlan interfaces.
As an alternative idea, how about caching rdd2 in unsafe space in a similar 
logic of UnsafeExternalSorter?
We can release the space by using TaskContext#addTaskCompletionListener.

If you have no time, I'm okay to take it.

> Optimize the Cartesian Join
> ---
>
> Key: SPARK-11704
> URL: https://issues.apache.org/jira/browse/SPARK-11704
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Zhan Zhang
>
> Currently CartesianProduct relies on RDD.cartesian, in which the computation 
> is realized as follows
>   override def compute(split: Partition, context: TaskContext): Iterator[(T, 
> U)] = {
> val currSplit = split.asInstanceOf[CartesianPartition]
> for (x <- rdd1.iterator(currSplit.s1, context);
>  y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
>   }
> From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. 
> Which is really heavy and may never finished if n is large, especially when 
> rdd2 is coming from ShuffleRDD.
> We should have some optimization on CartesianProduct by caching rightResults. 
> The problem is that we don’t have cleanup hook to unpersist rightResults 
> AFAIK. I think we should have some cleanup hook after query execution.
> With the hook available, we can easily optimize such Cartesian join. I 
> believe such cleanup hook may also benefit other query optimizations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org