Repository: spark Updated Branches: refs/heads/master 54032682b -> 928845a42
[SPARK-24172][SQL] we should not apply operator pushdown to data source v2 many times ## What changes were proposed in this pull request? In `PushDownOperatorsToDataSource`, we use `transformUp` to match `PhysicalOperation` and apply pushdown. This is problematic if we have multiple `Filter` and `Project` above the data source v2 relation. e.g. for a query ``` Project Filter DataSourceV2Relation ``` The pattern match will be triggered twice and we will do operator pushdown twice. This is unnecessary, we can use `mapChildren` to only apply pushdown once. ## How was this patch tested? existing test Author: Wenchen Fan <wenc...@databricks.com> Closes #21230 from cloud-fan/step2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/928845a4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/928845a4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/928845a4 Branch: refs/heads/master Commit: 928845a42230a2c0a318011002a54ad871468b2e Parents: 5403268 Author: Wenchen Fan <wenc...@databricks.com> Authored: Fri May 11 10:00:28 2018 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Fri May 11 10:00:28 2018 -0700 ---------------------------------------------------------------------- .../v2/PushDownOperatorsToDataSource.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/928845a4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala index 9293d4f..e894f8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala @@ -23,17 +23,10 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.catalyst.rules.Rule object PushDownOperatorsToDataSource extends Rule[LogicalPlan] { - override def apply( - plan: LogicalPlan): LogicalPlan = plan transformUp { + override def apply(plan: LogicalPlan): LogicalPlan = plan match { // PhysicalOperation guarantees that filters are deterministic; no need to check - case PhysicalOperation(project, newFilters, relation : DataSourceV2Relation) => - // merge the filters - val filters = relation.filters match { - case Some(existing) => - existing ++ newFilters - case _ => - newFilters - } + case PhysicalOperation(project, filters, relation: DataSourceV2Relation) => + assert(relation.filters.isEmpty, "data source v2 should do push down only once.") val projectAttrs = project.map(_.toAttribute) val projectSet = AttributeSet(project.flatMap(_.references)) @@ -67,5 +60,7 @@ object PushDownOperatorsToDataSource extends Rule[LogicalPlan] { } else { filtered } + + case other => other.mapChildren(apply) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org