[GitHub] spark pull request #14452: [SPARK-16849][SQL][WIP] Improve subquery executio...

2016-09-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r78499987
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -268,3 +269,35 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 case operator: SparkPlan => ensureDistributionAndOrdering(operator)
   }
 }
+
+
+/**
+ * Pushs down [[ShuffleExchange]] on the top of [[CommonSubqueryExec]] to 
make them reusable.
+ */
+case object PushDownShuffleExchange extends Rule[SparkPlan] {
--- End diff --

I revert this change because don't see improvement with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL][WIP] Improve subquery executio...

2016-09-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r78499975
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.subquery
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Utils
+
+private[sql] case class SubqueryExecHelper(executedPlan: SparkPlan) {
+  private var _computedOutput: RDD[InternalRow] = null
+
+  def computeOrGetResult(): RDD[InternalRow] = this.synchronized {
+if (_computedOutput == null) {
+  _computedOutput = executedPlan.execute().mapPartitionsInternal { 
rowIterator =>
+rowIterator.map(_.copy())
+  }.cache()
--- End diff --

I revert this change because don't see improvement with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL][WIP] Improve subquery executio...

2016-09-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r78494742
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ---
@@ -268,3 +269,35 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 case operator: SparkPlan => ensureDistributionAndOrdering(operator)
   }
 }
+
+
+/**
+ * Pushs down [[ShuffleExchange]] on the top of [[CommonSubqueryExec]] to 
make them reusable.
+ */
+case object PushDownShuffleExchange extends Rule[SparkPlan] {
--- End diff --

The common pattern of common subquery in TPC-DS, is joining two or more the 
same subquery. The joining is `SortMergeJoin` and `ShuffleExchange` is executed 
for all the subqueries after some operators (e.g., `Project`, `Filter`).

E.g.,

  : :- *SortMergeJoin [customer_id#898], [customer_id#912], Inner
  : :  :- *Sort [customer_id#898 ASC], false, 0
  : :  :  +- Exchange hashpartitioning(customer_id#898, 200)
  : :  : +- *Project [customer_id#898, year_total#902]
  : :  :+- *Filter (isnotnull(year#901) && 
isnotnull(year_total#902)) && (sale_type#903 = s)) && (year#901 = 2001)) && 
(year_total#902 > 0.0)) && isnotnull(customer_id#898))
  : :  :   +- CommonSubquery [customer_id#898, 
customer_first_name#899, customer_last_name#900, year#901, year_total#902, 
sale_type#903]

This rule is to pushdown `ShuffleExchange` on the top of 
`CommonSubqueryExec`. So other `ShuffleExchange`s can be transformed to 
`ReusedExchange` later. Then we can reuse the exchange.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL][WIP] Improve subquery executio...

2016-09-12 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r78494245
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.subquery
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Utils
+
+private[sql] case class SubqueryExecHelper(executedPlan: SparkPlan) {
+  private var _computedOutput: RDD[InternalRow] = null
+
+  def computeOrGetResult(): RDD[InternalRow] = this.synchronized {
+if (_computedOutput == null) {
+  _computedOutput = executedPlan.execute().mapPartitionsInternal { 
rowIterator =>
+rowIterator.map(_.copy())
+  }.cache()
--- End diff --

We need to cache explicitly the result of common subquery. Becasue in RDD 
materialization implementation, if a RDD is not marked to cache, even it is 
materialization before, the later call to its iterator will invoke computation 
again.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org