[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user viirya closed the pull request at: https://github.com/apache/spark/pull/14452 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14452#discussion_r94001153 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1167,3 +1173,211 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Optimizes the logical plans wrapped in SubqueryAlias and operators on them. + * The SubqueryAlias which are remaining in optimization phase are common subqueries, + * i.e., they are duplicate in the whole query plan. The logical plans wrapped in + * SubqueryAlias will be executed individually later. However, some operators such as + * Project and Filter can be optimized with the wrapped logical plans. Thus, this rule + * considers the optimization of the wrapped logical plans and operators on SubqueryAlias. + */ +case class OptimizeCommonSubqueries(optimizer: Optimizer) +extends Rule[LogicalPlan] with PredicateHelper { + // Optimized the subqueries which all have a Project parent node and the same results. + private def optimizeProjectWithSubqueries( + plan: LogicalPlan, + keyPlan: LogicalPlan, + subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = { +plan transform { + case p @ Project(pList, s @ SubqueryAlias(alias, subquery, v, true)) + if s.sameResult(keyPlan) => +val pListForAll: Seq[NamedExpression] = subqueries.flatMap { case Project(pList, child) => + val rewrites = buildRewrites(child, subquery) + pList.map(pushToOtherPlan(_, rewrites)) +} + +val newSubquery = Project(pListForAll, subquery) +val optimized = optimizer.execute(newSubquery) +// Check if any optimization is performed. +if (optimized.sameResult(newSubquery)) { + // No optimization happens. Let's keep original subquery. + p +} else { + Project(pList.map(_.toAttribute), SubqueryAlias(alias, newSubquery, v, true)) +} +} + } + + /** + * Maps Attributes from the source side to the corresponding Attribute on the target side. + */ + private def buildRewrites(source: LogicalPlan, target: LogicalPlan): AttributeMap[Attribute] = { +assert(source.output.size == target.output.size) +AttributeMap(source.output.zip(target.output)) + } + + /** + * Rewrites an expression so that it can be pushed to another LogicalPlan. + */ + private def pushToOtherPlan[A <: Expression](e: A, rewrites: AttributeMap[Attribute]) = { +val result = e transformUp { + case a: Attribute => rewrites.get(a).getOrElse(a) +} + +// We must promise the compiler that we did not discard the names in the case of project +// expressions. This is safe since the only transformation is from Attribute => Attribute. +result.asInstanceOf[A] + } + + private def optimizeFilterWithSubqueries( + plan: LogicalPlan, + keyPlan: LogicalPlan, + subqueries: ArrayBuffer[LogicalPlan]): LogicalPlan = { +var pushdownConds = splitConjunctivePredicates(subqueries(0).asInstanceOf[Filter].condition) +subqueries.tail.foreach { + case Filter(otherCond, child) => +val rewrites = buildRewrites(child, subqueries(0).asInstanceOf[Filter].child) +// We can't simply push down all conditions from other Filter by concatenating them with --- End diff -- This part has been extracted out as #15558 and can be removed if that PR is merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14452#discussion_r77476688 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -89,6 +90,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) CombineFilters, CombineLimits, CombineUnions, + // Pushdown Filters again after combination --- End diff -- #14912 is changed. But the changed solution is more complicated so I don't want to include it here. So I keep this change and wait for #14912 to be merged first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14452#discussion_r77446696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.util.Utils + +private[sql] case class CommonSubquery( +output: Seq[Attribute], +@transient child: SparkPlan)( +@transient val logicalChild: LogicalPlan, +private[sql] val _statistics: Statistics, +@transient private[sql] var _computedOutput: RDD[InternalRow] = null) --- End diff -- @hvanhovell Rethinking about this, it is incorrect indeed. I change it and use a helper class to do rdd materialization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14452#discussion_r77123756 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -89,6 +90,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) CombineFilters, CombineLimits, CombineUnions, + // Pushdown Filters again after combination --- End diff -- This change is submitted at #14912. Because it prevents this PR pushdown some predicates, for the easy of test, I also include that change here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14452#discussion_r77121433 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.util.Utils + +private[sql] case class CommonSubquery( +output: Seq[Attribute], +@transient child: SparkPlan)( +@transient val logicalChild: LogicalPlan, +private[sql] val _statistics: Statistics, +@transient private[sql] var _computedOutput: RDD[InternalRow] = null) --- End diff -- `_computedOutput` is used to keep the computed results across the common subqueries. Once the `CommonSubqueryExec` of any subquery among them is asked for the results, we will execute the executed plan and record the results in `computedOutput`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14452#discussion_r77108228 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.util.Utils + +private[sql] case class CommonSubquery( +output: Seq[Attribute], +@transient child: SparkPlan)( +@transient val logicalChild: LogicalPlan, +private[sql] val _statistics: Statistics, +@transient private[sql] var _computedOutput: RDD[InternalRow] = null) --- End diff -- I was thinking that `_computedOutput` will not be kept for all `CommonSubquery` sharing it. But it is not true. It will. So I think it is no problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/14452#discussion_r77107621 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.util.Utils + +private[sql] case class CommonSubquery( +output: Seq[Attribute], +@transient child: SparkPlan)( +@transient val logicalChild: LogicalPlan, +private[sql] val _statistics: Statistics, +@transient private[sql] var _computedOutput: RDD[InternalRow] = null) --- End diff -- oh. I make a mistake here... I will fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14452#discussion_r76997680 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery/CommonSubquery.scala --- @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.subquery + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.util.Utils + +private[sql] case class CommonSubquery( +output: Seq[Attribute], +@transient child: SparkPlan)( +@transient val logicalChild: LogicalPlan, +private[sql] val _statistics: Statistics, +@transient private[sql] var _computedOutput: RDD[InternalRow] = null) --- End diff -- This does not guarantee that the result is materialized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14452: [SPARK-16849][SQL] Improve subquery execution by ...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/14452 [SPARK-16849][SQL] Improve subquery execution by deduplicating the subqueries with the same results ## What changes were proposed in this pull request? The subqueries in SparkSQL will be run even they have the same physical plan and output same results. We should be able to deduplicate these subqueries which are referred in a query for many times. ## How was this patch tested? Jenkins tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 single-exec-subquery Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14452.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14452 commit 00b29ede65b84e0fc99ab9e0ebd33f6092077bbc Author: Liang-Chi HsiehDate: 2016-08-01T03:41:34Z Dedup common subqueries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org