aokolnychyi commented on a change in pull request #32921: URL: https://github.com/apache/spark/pull/32921#discussion_r654044355
########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates + * Spark infers by reusing parts of the query. + * + * @since 3.2.0 + */ +@Experimental +public interface SupportsDynamicFiltering extends Scan { + /** + * Returns attributes this scan can be dynamically filtered by. + * <p> + * Spark will call {@link #filter(Filter[])} if it can derive a dynamic + * predicate for any of the filter attributes. + */ + NamedReference[] filterAttributes(); + + /** + * Dynamically filters this scan. + * <p> + * The provided expressions must be interpreted as a set of filters that are ANDed together. + * Implementations may use the filters to prune originally planned {@link InputPartition}s. + * <p> + * Spark will call {@link Scan#toBatch()}, {@link Scan#toMicroBatchStream(String)} or + * {@link Scan#toContinuousStream(String)} again after filtering the scan dynamically. + * The newly produced {@link Batch} or its streaming alternative may report a subset of + * originally planned {@link InputPartition}s. + * + * @param filters dynamic filters + */ + void filter(Filter[] filters); Review comment: I think that would be best, @cloud-fan. Has there been any discussion on how the new API should look like? Since the old API has been exposed in `SupportsPushDownFilters`, what is the plan for introducing the new API? Will we introduce a new method with a default implementation that would translate into to the old API? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ########## @@ -96,6 +96,7 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( PlanAdaptiveDynamicPruningFilters(this), ReuseAdaptiveSubquery(context.subqueryCache), + PrepareScans, Review comment: I think @cloud-fan is right. If I understand correctly, `AdaptiveSparkPlanExec` runs `queryStagePreparationRules` that adds shuffles on the input plan to get the initial physical plan. That plan is further analyzed and is split into query stages based on shuffles. Once Spark creates stages in a plan, it applies `queryStageOptimizerRules` and `postStageCreationRules` on each of them. A stage may have a join without shuffles (e.g. if the scan output partitioning is `SinglePartition`). If we dynamically filter the scan and it now reports 0 partitions, the stage execution would fail as the output partitioning is no longer valid. The problem is that need to run filters after `PlanAdaptiveDynamicPruningFilters` which is part of the query optimizer rules in AQE. Any thoughts, @cloud-fan @sunchao? Is it possible to run `queryStagePreparationRules` before executing the stage? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ########## @@ -96,6 +96,7 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( PlanAdaptiveDynamicPruningFilters(this), ReuseAdaptiveSubquery(context.subqueryCache), + PrepareScans, Review comment: I think @cloud-fan is right. If I understand correctly, `AdaptiveSparkPlanExec` runs `queryStagePreparationRules` that adds shuffles on the input plan to get the initial physical plan. That plan is further analyzed and is split into query stages based on shuffles. Once Spark creates stages in a plan, it applies `queryStageOptimizerRules` and `postStageCreationRules` on each of them. A stage may have a join without shuffles (e.g. if the scan output partitioning is `SinglePartition`). If we dynamically filter the scan and it now reports 0 partitions, the stage execution will fail as the output partitioning is no longer valid. The problem is that need to run filters after `PlanAdaptiveDynamicPruningFilters` which is part of the query optimizer rules in AQE. Any thoughts, @cloud-fan @sunchao? Is it possible to run `queryStagePreparationRules` before executing the stage? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ########## @@ -96,6 +96,7 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( PlanAdaptiveDynamicPruningFilters(this), ReuseAdaptiveSubquery(context.subqueryCache), + PrepareScans, Review comment: I think @cloud-fan is right. If I understand correctly, `AdaptiveSparkPlanExec` runs `queryStagePreparationRules` that adds shuffles on the input plan to get the initial physical plan. That plan is further analyzed and is split into query stages based on shuffles. Once Spark creates stages in a plan, it applies `queryStageOptimizerRules` and `postStageCreationRules` on each of them. A stage may have a join without shuffles (e.g. if the scan output partitioning is `SinglePartition`). If we dynamically filter the scan and it now reports 0 partitions, the stage execution will fail as the output partitioning is no longer valid. The problem is that we need to run filters after `PlanAdaptiveDynamicPruningFilters` which is part of the query optimizer rules in AQE. Any thoughts, @cloud-fan @sunchao? Is it possible to run `queryStagePreparationRules` before executing the stage? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ########## @@ -227,3 +228,14 @@ object ReuseSubquery extends Rule[SparkPlan] { } } } + +object PrepareScans extends Rule[SparkPlan] { + def apply(plan: SparkPlan): SparkPlan = { + val scans = plan.collect { + case scan: BatchScanExec => scan + } + scans.foreach(_.prepare()) Review comment: Sure! Will move to a new file. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can Review comment: Agreed. Will add the details. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala ########## @@ -66,10 +71,33 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join } case _ => None } + case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsDynamicFiltering, _)) => + val filterAttrs = AttributeSet(resolveRefs(r, scan.filterAttributes)) + if (resExp.references.subsetOf(filterAttrs)) { + Some(r) + } else { + None + } case _ => None } } + // TODO: is there a good place to put this? + private def resolveRefs( Review comment: Any existing classes we can use? Seems like we would need a new utility class. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala ########## @@ -66,10 +71,33 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join } case _ => None } + case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsDynamicFiltering, _)) => + val filterAttrs = AttributeSet(resolveRefs(r, scan.filterAttributes)) Review comment: +1 on skipping rather than failing if we cannot resolve the filter attrs. W.r.t. which columns to use, I did use the scan output to resolve on purpose as we cannot derive a filter on an attribute that hasn't been projected. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala ########## @@ -17,38 +17,84 @@ package org.apache.spark.sql.execution.datasources.v2 +import com.google.common.base.Objects + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, SupportsDynamicFiltering} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy /** * Physical plan node for scanning a batch of data from a data source v2. */ case class BatchScanExec( output: Seq[AttributeReference], - @transient scan: Scan) extends DataSourceV2ScanExecBase { + @transient scan: Scan, + dynamicFilters: Seq[Expression]) extends DataSourceV2ScanExecBase { @transient lazy val batch = scan.toBatch + private lazy val runnableDynamicFilters = dynamicFilters.collect { + case e: DynamicPruningExpression if e.child != Literal.TrueLiteral => e + } + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { - case other: BatchScanExec => this.batch == other.batch + case other: BatchScanExec => + this.batch == other.batch && this.dynamicFilters == other.dynamicFilters case _ => false } - override def hashCode(): Int = batch.hashCode() + override def hashCode(): Int = Objects.hashCode(batch, dynamicFilters) + + @transient private lazy val originalPartitions = batch.planInputPartitions() + @transient private var filteredPartitions: Option[Seq[InputPartition]] = None - @transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() + override def partitions: Seq[InputPartition] = filteredPartitions.getOrElse(originalPartitions) - override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() + private lazy val originalReaderFactory = batch.createReaderFactory() + private var filteredReaderFactory: Option[PartitionReaderFactory] = None + + override def readerFactory: PartitionReaderFactory = { + filteredReaderFactory.getOrElse(originalReaderFactory) + } override lazy val inputRDD: RDD[InternalRow] = { new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics) } override def doCanonicalize(): BatchScanExec = { - this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output))) + this.copy( + output = output.map(QueryPlan.normalizeExpressions(_, output)), + dynamicFilters = QueryPlan.normalizePredicates( + dynamicFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)), + output)) + } + + override protected def doPrepare(): Unit = { + if (runnableDynamicFilters.nonEmpty) { + // TODO: this triggers a broadcast which we don't need + waitForSubqueries() + + val dataSourceFilters = runnableDynamicFilters.flatMap { + case DynamicPruningExpression(e) => DataSourceStrategy.translateDynamicFilter(e) + case _ => None + } + + if (dataSourceFilters.isEmpty) { + logWarning("Skipping dynamic filtering, could not derive source filters") + return + } + + val filterableScan = scan.asInstanceOf[SupportsDynamicFiltering] + filterableScan.filter(dataSourceFilters.toArray) + + val filteredBatch = scan.toBatch + + filteredPartitions = Some(filteredBatch.planInputPartitions()) Review comment: @viirya is correct. Usually, there is no second planning. Instead, existing input partitions that has been already planned are filtered using dynamic filters. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala ########## @@ -17,38 +17,84 @@ package org.apache.spark.sql.execution.datasources.v2 +import com.google.common.base.Objects + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan} +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory, Scan, SupportsDynamicFiltering} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy /** * Physical plan node for scanning a batch of data from a data source v2. */ case class BatchScanExec( output: Seq[AttributeReference], - @transient scan: Scan) extends DataSourceV2ScanExecBase { + @transient scan: Scan, + dynamicFilters: Seq[Expression]) extends DataSourceV2ScanExecBase { @transient lazy val batch = scan.toBatch + private lazy val runnableDynamicFilters = dynamicFilters.collect { + case e: DynamicPruningExpression if e.child != Literal.TrueLiteral => e + } + // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { - case other: BatchScanExec => this.batch == other.batch + case other: BatchScanExec => + this.batch == other.batch && this.dynamicFilters == other.dynamicFilters case _ => false } - override def hashCode(): Int = batch.hashCode() + override def hashCode(): Int = Objects.hashCode(batch, dynamicFilters) + + @transient private lazy val originalPartitions = batch.planInputPartitions() + @transient private var filteredPartitions: Option[Seq[InputPartition]] = None - @transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() + override def partitions: Seq[InputPartition] = filteredPartitions.getOrElse(originalPartitions) - override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() + private lazy val originalReaderFactory = batch.createReaderFactory() + private var filteredReaderFactory: Option[PartitionReaderFactory] = None + + override def readerFactory: PartitionReaderFactory = { + filteredReaderFactory.getOrElse(originalReaderFactory) + } override lazy val inputRDD: RDD[InternalRow] = { new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar, customMetrics) } override def doCanonicalize(): BatchScanExec = { - this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output))) + this.copy( + output = output.map(QueryPlan.normalizeExpressions(_, output)), + dynamicFilters = QueryPlan.normalizePredicates( + dynamicFilters.filterNot(_ == DynamicPruningExpression(Literal.TrueLiteral)), + output)) + } + + override protected def doPrepare(): Unit = { + if (runnableDynamicFilters.nonEmpty) { + // TODO: this triggers a broadcast which we don't need Review comment: Will do. I am not sure about this point so we will need to discuss it a little bit. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates + * Spark infers by reusing parts of the query. + * + * @since 3.2.0 + */ +@Experimental +public interface SupportsDynamicFiltering extends Scan { Review comment: I did use `dynamicXXX` as it common throughout the code but `SupportsRuntimeFiltering` does sound more accurate. I'll update. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates + * Spark infers by reusing parts of the query. + * + * @since 3.2.0 + */ +@Experimental +public interface SupportsDynamicFiltering extends Scan { Review comment: I did use `dynamicXXX` as it is common throughout the code but `SupportsRuntimeFiltering` does sound more accurate. I'll update. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates Review comment: Well, we do filter input partitions but the filtering can be using a metadata column (e.g. file name). ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates + * Spark infers by reusing parts of the query. + * + * @since 3.2.0 + */ +@Experimental +public interface SupportsDynamicFiltering extends Scan { + /** + * Returns attributes this scan can be dynamically filtered by. + * <p> + * Spark will call {@link #filter(Filter[])} if it can derive a dynamic + * predicate for any of the filter attributes. + */ + NamedReference[] filterAttributes(); + + /** + * Dynamically filters this scan. + * <p> + * The provided expressions must be interpreted as a set of filters that are ANDed together. + * Implementations may use the filters to prune originally planned {@link InputPartition}s. + * <p> + * Spark will call {@link Scan#toBatch()}, {@link Scan#toMicroBatchStream(String)} or + * {@link Scan#toContinuousStream(String)} again after filtering the scan dynamically. Review comment: I don't think it works for streaming plans right now. Shall I just refer to `toBatch` in the doc? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ########## @@ -227,3 +228,14 @@ object ReuseSubquery extends Rule[SparkPlan] { } } } + +object PrepareScans extends Rule[SparkPlan] { + def apply(plan: SparkPlan): SparkPlan = { + val scans = plan.collect { + case scan: BatchScanExec => scan Review comment: I'd like to avoid that too. I am using `prepare` here as it is idempotent. If we return a new `BatchScanExec`, will that mean the subqueries will be executed twice as the new plan won't be prepared? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala ########## @@ -66,10 +71,33 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join } case _ => None } + case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsDynamicFiltering, _)) => + val filterAttrs = AttributeSet(resolveRefs(r, scan.filterAttributes)) + if (resExp.references.subsetOf(filterAttrs)) { + Some(r) + } else { + None + } case _ => None } } + // TODO: is there a good place to put this? + private def resolveRefs( Review comment: I think this can go in a separate PR once the review is done. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala ########## @@ -66,10 +71,33 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with Join } case _ => None } + case (resExp, r @ DataSourceV2ScanRelation(_, scan: SupportsDynamicFiltering, _)) => + val filterAttrs = AttributeSet(resolveRefs(r, scan.filterAttributes)) + if (resExp.references.subsetOf(filterAttrs)) { + Some(r) + } else { + None + } case _ => None } } + // TODO: is there a good place to put this? + private def resolveRefs( Review comment: I think this can go in a separate PR before this PR once the review is done. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates + * Spark infers by reusing parts of the query. + * + * @since 3.2.0 + */ +@Experimental +public interface SupportsDynamicFiltering extends Scan { + /** + * Returns attributes this scan can be dynamically filtered by. + * <p> + * Spark will call {@link #filter(Filter[])} if it can derive a dynamic + * predicate for any of the filter attributes. + */ + NamedReference[] filterAttributes(); + + /** + * Dynamically filters this scan. + * <p> + * The provided expressions must be interpreted as a set of filters that are ANDed together. + * Implementations may use the filters to prune originally planned {@link InputPartition}s. + * <p> + * Spark will call {@link Scan#toBatch()}, {@link Scan#toMicroBatchStream(String)} or + * {@link Scan#toContinuousStream(String)} again after filtering the scan dynamically. + * The newly produced {@link Batch} or its streaming alternative may report a subset of + * originally planned {@link InputPartition}s. + * + * @param filters dynamic filters + */ + void filter(Filter[] filters); Review comment: Thanks for the pointer, @sunchao. I think @rdblue's comment [here](https://github.com/dbtsai/spark/pull/10/files#r368072666) matches my proposal above about adding v2 filters in parallel and having a default implementation that converts v2 to v1. I can pick up the v2 filter API but I'd like to do that independently of this PR if everyone is on board. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates + * Spark infers by reusing parts of the query. + * + * @since 3.2.0 + */ +@Experimental +public interface SupportsDynamicFiltering extends Scan { + /** + * Returns attributes this scan can be dynamically filtered by. + * <p> + * Spark will call {@link #filter(Filter[])} if it can derive a dynamic + * predicate for any of the filter attributes. + */ + NamedReference[] filterAttributes(); + + /** + * Dynamically filters this scan. + * <p> + * The provided expressions must be interpreted as a set of filters that are ANDed together. + * Implementations may use the filters to prune originally planned {@link InputPartition}s. + * <p> + * Spark will call {@link Scan#toBatch()}, {@link Scan#toMicroBatchStream(String)} or + * {@link Scan#toContinuousStream(String)} again after filtering the scan dynamically. Review comment: Agreed. I'll update. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ########## @@ -96,6 +96,7 @@ case class AdaptiveSparkPlanExec( @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( PlanAdaptiveDynamicPruningFilters(this), ReuseAdaptiveSubquery(context.subqueryCache), + PrepareScans, Review comment: While it would be easier to not allow data sources to change the partitioning during dynamic filtering, I am really trying to avoid that as it would be a substantial limitation. I mention this in the "Output Partitioning and Dynamic Filtering" section of the design doc. There are primarily two problems. First, if dynamic filtering is selective, this can lead to a lot of (almost) empty tasks being triggered hurting the performance. Second, it is hard to check whether the output partitioning matches. We can validate the number of partitions but the rest is hard. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates + * Spark infers by reusing parts of the query. + * + * @since 3.2.0 + */ +@Experimental +public interface SupportsDynamicFiltering extends Scan { + /** + * Returns attributes this scan can be dynamically filtered by. + * <p> + * Spark will call {@link #filter(Filter[])} if it can derive a dynamic + * predicate for any of the filter attributes. + */ + NamedReference[] filterAttributes(); + + /** + * Dynamically filters this scan. + * <p> + * The provided expressions must be interpreted as a set of filters that are ANDed together. + * Implementations may use the filters to prune originally planned {@link InputPartition}s. + * <p> + * Spark will call {@link Scan#toBatch()}, {@link Scan#toMicroBatchStream(String)} or + * {@link Scan#toContinuousStream(String)} again after filtering the scan dynamically. + * The newly produced {@link Batch} or its streaming alternative may report a subset of + * originally planned {@link InputPartition}s. + * + * @param filters dynamic filters + */ + void filter(Filter[] filters); Review comment: I'll be fine with either adding `SupportsPushDownV2Filters` or just adding new methods with a default implementation. ``` interface SupportsPushDownFilters extends ScanBuilder { Filter[] pushFilters(Filter[] filters); default V2Filter[] pushFilters(V2Filter[] filters) { // convert v2 to v1 and call pushFilters for v1 // convert v1 result back to v2 } } ``` In any case, we must ensure we don't break existing sources. ########## File path: sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.read; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.connector.expressions.NamedReference; +import org.apache.spark.sql.sources.Filter; + +/** + * A mix-in interface for {@link Scan}. Data sources can implement this interface if they can + * dynamically filter {@link InputPartition}s that were originally planned using predicates + * Spark infers by reusing parts of the query. + * + * @since 3.2.0 + */ +@Experimental +public interface SupportsDynamicFiltering extends Scan { + /** + * Returns attributes this scan can be dynamically filtered by. + * <p> + * Spark will call {@link #filter(Filter[])} if it can derive a dynamic + * predicate for any of the filter attributes. + */ + NamedReference[] filterAttributes(); + + /** + * Dynamically filters this scan. + * <p> + * The provided expressions must be interpreted as a set of filters that are ANDed together. + * Implementations may use the filters to prune originally planned {@link InputPartition}s. + * <p> + * Spark will call {@link Scan#toBatch()}, {@link Scan#toMicroBatchStream(String)} or + * {@link Scan#toContinuousStream(String)} again after filtering the scan dynamically. + * The newly produced {@link Batch} or its streaming alternative may report a subset of + * originally planned {@link InputPartition}s. + * + * @param filters dynamic filters + */ + void filter(Filter[] filters); Review comment: Is it alright with everybody to consider v1 filters in the scope of this PR? I'll take over @dbtsai's PR later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org