[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2017-03-07 Thread viirya
Github user viirya closed the pull request at:

https://github.com/apache/spark/pull/14452


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-12-27 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r94001153
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1167,3 +1173,211 @@ object RemoveRepetitionFromGroupExpressions extends 
Rule[LogicalPlan] {
   a.copy(groupingExpressions = newGrouping)
   }
 }
+
+/**
+ * Optimizes the logical plans wrapped in SubqueryAlias and operators on 
them.
+ * The SubqueryAlias which are remaining in optimization phase are common 
subqueries,
+ * i.e., they are duplicate in the whole query plan. The logical plans 
wrapped in
+ * SubqueryAlias will be executed individually later. However, some 
operators such as
+ * Project and Filter can be optimized with the wrapped logical plans. 
Thus, this rule
+ * considers the optimization of the wrapped logical plans and operators 
on SubqueryAlias.
+ */
+case class OptimizeCommonSubqueries(optimizer: Optimizer)
+extends Rule[LogicalPlan] with PredicateHelper {
+  // Optimized the subqueries which all have a Project parent node and the 
same results.
+  private def optimizeProjectWithSubqueries(
+  plan: LogicalPlan,
+  keyPlan: LogicalPlan,
+  subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
+plan transform {
+  case p @ Project(pList, s @ SubqueryAlias(alias, subquery, v, true))
+  if s.sameResult(keyPlan) =>
+val pListForAll: Seq[NamedExpression] = subqueries.flatMap { case 
Project(pList, child) =>
+  val rewrites = buildRewrites(child, subquery)
+  pList.map(pushToOtherPlan(_, rewrites))
+}
+
+val newSubquery = Project(pListForAll, subquery)
+val optimized = optimizer.execute(newSubquery)
+// Check if any optimization is performed.
+if (optimized.sameResult(newSubquery)) {
+  // No optimization happens. Let's keep original subquery.
+  p
+} else {
+  Project(pList.map(_.toAttribute), SubqueryAlias(alias, 
newSubquery, v, true))
+}
+}
+  }
+
+  /**
+   * Maps Attributes from the source side to the corresponding Attribute 
on the target side.
+   */
+  private def buildRewrites(source: LogicalPlan, target: LogicalPlan): 
AttributeMap[Attribute] = {
+assert(source.output.size == target.output.size)
+AttributeMap(source.output.zip(target.output))
+  }
+
+  /**
+   * Rewrites an expression so that it can be pushed to another 
LogicalPlan.
+   */
+  private def pushToOtherPlan[A <: Expression](e: A, rewrites: 
AttributeMap[Attribute]) = {
+val result = e transformUp {
+  case a: Attribute => rewrites.get(a).getOrElse(a)
+}
+
+// We must promise the compiler that we did not discard the names in 
the case of project
+// expressions.  This is safe since the only transformation is from 
Attribute => Attribute.
+result.asInstanceOf[A]
+  }
+
+  private def optimizeFilterWithSubqueries(
+  plan: LogicalPlan,
+  keyPlan: LogicalPlan,
+  subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = {
+var pushdownConds = 
splitConjunctivePredicates(subqueries(0).asInstanceOf[Filter].condition)
+subqueries.tail.foreach {
+  case Filter(otherCond, child) =>
+val rewrites = buildRewrites(child, 
subqueries(0).asInstanceOf[Filter].child)
+// We can't simply push down all conditions from other Filter by 
concatenating them with
--- End diff --

This part has been extracted out as #15558 and can be removed if that PR is 
merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-09-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r77476688
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -89,6 +90,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: CatalystConf)
   CombineFilters,
   CombineLimits,
   CombineUnions,
+  // Pushdown Filters again after combination
--- End diff --

#14912 is changed. But the changed solution is more complicated so I don't 
want to include it here. So I keep this change and wait for #14912 to be merged 
first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-09-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r77446696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.subquery
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Utils
+
+private[sql] case class CommonSubquery(
+output: Seq[Attribute],
+@transient child: SparkPlan)(
+@transient val logicalChild: LogicalPlan,
+private[sql] val _statistics: Statistics,
+@transient private[sql] var _computedOutput: RDD[InternalRow] = null)
--- End diff --

@hvanhovell Rethinking about this, it is incorrect indeed. I change it and 
use a helper class to do rdd materialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-09-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r77123756
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -89,6 +90,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: CatalystConf)
   CombineFilters,
   CombineLimits,
   CombineUnions,
+  // Pushdown Filters again after combination
--- End diff --

This change is submitted at #14912. Because it prevents this PR pushdown 
some predicates, for the easy of test, I also include that change here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-09-01 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r77121433
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.subquery
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Utils
+
+private[sql] case class CommonSubquery(
+output: Seq[Attribute],
+@transient child: SparkPlan)(
+@transient val logicalChild: LogicalPlan,
+private[sql] val _statistics: Statistics,
+@transient private[sql] var _computedOutput: RDD[InternalRow] = null)
--- End diff --

`_computedOutput` is used to keep the computed results across the common 
subqueries. Once the `CommonSubqueryExec` of any subquery among them is asked 
for the results, we will execute the executed plan and record the results in 
`computedOutput`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-08-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r77108228
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.subquery
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Utils
+
+private[sql] case class CommonSubquery(
+output: Seq[Attribute],
+@transient child: SparkPlan)(
+@transient val logicalChild: LogicalPlan,
+private[sql] val _statistics: Statistics,
+@transient private[sql] var _computedOutput: RDD[InternalRow] = null)
--- End diff --

I was thinking that `_computedOutput` will not be kept for all 
`CommonSubquery` sharing it. But it is not true. It will. So I think it is no 
problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-08-31 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r77107621
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.subquery
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Utils
+
+private[sql] case class CommonSubquery(
+output: Seq[Attribute],
+@transient child: SparkPlan)(
+@transient val logicalChild: LogicalPlan,
+private[sql] val _statistics: Statistics,
+@transient private[sql] var _computedOutput: RDD[InternalRow] = null)
--- End diff --

oh. I make a mistake here... I will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-08-31 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14452#discussion_r76997680
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.subquery
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
Statistics}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.util.Utils
+
+private[sql] case class CommonSubquery(
+output: Seq[Attribute],
+@transient child: SparkPlan)(
+@transient val logicalChild: LogicalPlan,
+private[sql] val _statistics: Statistics,
+@transient private[sql] var _computedOutput: RDD[InternalRow] = null)
--- End diff --

This does not guarantee that the result is materialized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...

2016-08-01 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/14452

[SPARK-16849][SQL] Improve subquery execution by deduplicating the 
subqueries with the same results

## What changes were proposed in this pull request?

The subqueries in SparkSQL will be run even they have the same physical 
plan and output same results. We should be able to deduplicate these subqueries 
which are referred in a query for many times.

## How was this patch tested?

Jenkins tests.




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 single-exec-subquery

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14452.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14452


commit 00b29ede65b84e0fc99ab9e0ebd33f6092077bbc
Author: Liang-Chi Hsieh 
Date:   2016-08-01T03:41:34Z

Dedup common subqueries.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org