[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005682#comment-15005682 ] Zhan Zhang commented on SPARK-11704: [~maropu] You are right. I mean fetching from network is a big overhead. Feel free to work on it. > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005111#comment-15005111 ] Takeshi Yamamuro commented on SPARK-11704: -- ISTM that some earlier stages in rdd2 are skipped in all the iterations except the first one in case of rdd2 comming from ShuffleRDD. That said, it is worth doing this optimization. > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005134#comment-15005134 ] Zhan Zhang commented on SPARK-11704: [~maropu] Maybe I misunderstand. If RDD2 is coming from ShuffleRDD, each new iterator will try to fetch from network because RDD2 is not cached. Is the ShuffleRDD cached automatically? > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005136#comment-15005136 ] Zhan Zhang commented on SPARK-11704: I think we can add a cleanup hook in SQLContext, and when the query is done, we invoke all the cleanup hook registered. By this way, the CartesianProduct we can cache the rightResult and register the cleanup handler(unpersist). By this way, we avoid the recomputation of RDD2. In my testing, because rdd2 is quite small, I actually reverse the cartesian join by reverting cartesian(rdd1, rdd2) to cartesian(rdd2, rdd1). By this way, the computation is done quite fast, but the original form cannot be finished. > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-11704) Optimize the Cartesian Join
[ https://issues.apache.org/jira/browse/SPARK-11704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15005217#comment-15005217 ] Takeshi Yamamuro commented on SPARK-11704: -- You're right; they're not automatically cached. I just say that earlier stages in rdd2 are skipped and the iterator just fetches blocks from remote BlockManager (the blocks are written in ShuffleRDD). You mean that fetching remote blocks is too slow? Anyway, adding cleanup hook can make a big impact on SparkPlan interfaces. As an alternative idea, how about caching rdd2 in unsafe space in a similar logic of UnsafeExternalSorter? We can release the space by using TaskContext#addTaskCompletionListener. If you have no time, I'm okay to take it. > Optimize the Cartesian Join > --- > > Key: SPARK-11704 > URL: https://issues.apache.org/jira/browse/SPARK-11704 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Zhan Zhang > > Currently CartesianProduct relies on RDD.cartesian, in which the computation > is realized as follows > override def compute(split: Partition, context: TaskContext): Iterator[(T, > U)] = { > val currSplit = split.asInstanceOf[CartesianPartition] > for (x <- rdd1.iterator(currSplit.s1, context); > y <- rdd2.iterator(currSplit.s2, context)) yield (x, y) > } > From the above loop, if rdd1.count is n, rdd2 needs to be recomputed n times. > Which is really heavy and may never finished if n is large, especially when > rdd2 is coming from ShuffleRDD. > We should have some optimization on CartesianProduct by caching rightResults. > The problem is that we don’t have cleanup hook to unpersist rightResults > AFAIK. I think we should have some cleanup hook after query execution. > With the hook available, we can easily optimize such Cartesian join. I > believe such cleanup hook may also benefit other query optimizations. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org