aokolnychyi commented on a change in pull request #32921:
URL: https://github.com/apache/spark/pull/32921#discussion_r654044355



##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates
+ * Spark infers by reusing parts of the query.
+ *
+ * @since 3.2.0
+ */
+@Experimental
+public interface SupportsDynamicFiltering extends Scan {
+  /**
+   * Returns attributes this scan can be dynamically filtered by.
+   * <p>
+   * Spark will call {@link #filter(Filter[])} if it can derive a dynamic
+   * predicate for any of the filter attributes.
+   */
+  NamedReference[] filterAttributes();
+
+  /**
+   * Dynamically filters this scan.
+   * <p>
+   * The provided expressions must be interpreted as a set of filters that are 
ANDed together.
+   * Implementations may use the filters to prune originally planned {@link 
InputPartition}s.
+   * <p>
+   * Spark will call {@link Scan#toBatch()}, {@link 
Scan#toMicroBatchStream(String)} or
+   * {@link Scan#toContinuousStream(String)} again after filtering the scan 
dynamically.
+   * The newly produced {@link Batch} or its streaming alternative may report 
a subset of
+   * originally planned {@link InputPartition}s.
+   *
+   * @param filters dynamic filters
+   */
+  void filter(Filter[] filters);

Review comment:
       I think that would be best, @cloud-fan. Has there been any discussion on 
how the new API should look like? Since the old API has been exposed in 
`SupportsPushDownFilters`, what is the plan for introducing the new API? Will 
we introduce a new method with a default implementation that would translate 
into to the old API?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -96,6 +96,7 @@ case class AdaptiveSparkPlanExec(
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
     PlanAdaptiveDynamicPruningFilters(this),
     ReuseAdaptiveSubquery(context.subqueryCache),
+    PrepareScans,

Review comment:
       I think @cloud-fan is right.
   
   If I understand correctly, `AdaptiveSparkPlanExec` runs 
`queryStagePreparationRules` that adds shuffles on the input plan to get the 
initial physical plan. That plan is further analyzed and is split into query 
stages based on shuffles. Once Spark creates stages in a plan, it applies 
`queryStageOptimizerRules` and `postStageCreationRules` on each of them. A 
stage may have a join without shuffles (e.g. if the scan output partitioning is 
`SinglePartition`). If we dynamically filter the scan and it now reports 0 
partitions, the stage execution would fail as the output partitioning is no 
longer valid.
   
   The problem is that need to run filters after 
`PlanAdaptiveDynamicPruningFilters` which is part of the query optimizer rules 
in AQE.
   
   Any thoughts, @cloud-fan @sunchao? Is it possible to run 
`queryStagePreparationRules` before executing the stage?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -96,6 +96,7 @@ case class AdaptiveSparkPlanExec(
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
     PlanAdaptiveDynamicPruningFilters(this),
     ReuseAdaptiveSubquery(context.subqueryCache),
+    PrepareScans,

Review comment:
       I think @cloud-fan is right.
   
   If I understand correctly, `AdaptiveSparkPlanExec` runs 
`queryStagePreparationRules` that adds shuffles on the input plan to get the 
initial physical plan. That plan is further analyzed and is split into query 
stages based on shuffles. Once Spark creates stages in a plan, it applies 
`queryStageOptimizerRules` and `postStageCreationRules` on each of them. A 
stage may have a join without shuffles (e.g. if the scan output partitioning is 
`SinglePartition`). If we dynamically filter the scan and it now reports 0 
partitions, the stage execution will fail as the output partitioning is no 
longer valid.
   
   The problem is that need to run filters after 
`PlanAdaptiveDynamicPruningFilters` which is part of the query optimizer rules 
in AQE.
   
   Any thoughts, @cloud-fan @sunchao? Is it possible to run 
`queryStagePreparationRules` before executing the stage?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -96,6 +96,7 @@ case class AdaptiveSparkPlanExec(
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
     PlanAdaptiveDynamicPruningFilters(this),
     ReuseAdaptiveSubquery(context.subqueryCache),
+    PrepareScans,

Review comment:
       I think @cloud-fan is right.
   
   If I understand correctly, `AdaptiveSparkPlanExec` runs 
`queryStagePreparationRules` that adds shuffles on the input plan to get the 
initial physical plan. That plan is further analyzed and is split into query 
stages based on shuffles. Once Spark creates stages in a plan, it applies 
`queryStageOptimizerRules` and `postStageCreationRules` on each of them. A 
stage may have a join without shuffles (e.g. if the scan output partitioning is 
`SinglePartition`). If we dynamically filter the scan and it now reports 0 
partitions, the stage execution will fail as the output partitioning is no 
longer valid.
   
   The problem is that we need to run filters after 
`PlanAdaptiveDynamicPruningFilters` which is part of the query optimizer rules 
in AQE.
   
   Any thoughts, @cloud-fan @sunchao? Is it possible to run 
`queryStagePreparationRules` before executing the stage?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
##########
@@ -227,3 +228,14 @@ object ReuseSubquery extends Rule[SparkPlan] {
     }
   }
 }
+
+object PrepareScans extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    val scans = plan.collect {
+      case scan: BatchScanExec => scan
+    }
+    scans.foreach(_.prepare())

Review comment:
       Sure! Will move to a new file. 

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can

Review comment:
       Agreed. Will add the details.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
##########
@@ -66,10 +71,33 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
             }
           case _ => None
         }
+      case (resExp, r @ DataSourceV2ScanRelation(_, scan: 
SupportsDynamicFiltering, _)) =>
+        val filterAttrs = AttributeSet(resolveRefs(r, scan.filterAttributes))
+        if (resExp.references.subsetOf(filterAttrs)) {
+          Some(r)
+        } else {
+          None
+        }
       case _ => None
     }
   }
 
+  // TODO: is there a good place to put this?
+  private def resolveRefs(

Review comment:
       Any existing classes we can use? Seems like we would need a new utility 
class. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
##########
@@ -66,10 +71,33 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
             }
           case _ => None
         }
+      case (resExp, r @ DataSourceV2ScanRelation(_, scan: 
SupportsDynamicFiltering, _)) =>
+        val filterAttrs = AttributeSet(resolveRefs(r, scan.filterAttributes))

Review comment:
       +1 on skipping rather than failing if we cannot resolve the filter attrs.
   
   W.r.t. which columns to use, I did use the scan output to resolve on purpose 
as we cannot derive a filter on an attribute that hasn't been projected.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
##########
@@ -17,38 +17,84 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import com.google.common.base.Objects
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan}
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan, SupportsDynamicFiltering}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 
 /**
  * Physical plan node for scanning a batch of data from a data source v2.
  */
 case class BatchScanExec(
     output: Seq[AttributeReference],
-    @transient scan: Scan) extends DataSourceV2ScanExecBase {
+    @transient scan: Scan,
+    dynamicFilters: Seq[Expression]) extends DataSourceV2ScanExecBase {
 
   @transient lazy val batch = scan.toBatch
 
+  private lazy val runnableDynamicFilters = dynamicFilters.collect {
+    case e: DynamicPruningExpression if e.child != Literal.TrueLiteral => e
+  }
+
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
-    case other: BatchScanExec => this.batch == other.batch
+    case other: BatchScanExec =>
+      this.batch == other.batch && this.dynamicFilters == other.dynamicFilters
     case _ => false
   }
 
-  override def hashCode(): Int = batch.hashCode()
+  override def hashCode(): Int = Objects.hashCode(batch, dynamicFilters)
+
+  @transient private lazy val originalPartitions = batch.planInputPartitions()
+  @transient private var filteredPartitions: Option[Seq[InputPartition]] = None
 
-  @transient override lazy val partitions: Seq[InputPartition] = 
batch.planInputPartitions()
+  override def partitions: Seq[InputPartition] = 
filteredPartitions.getOrElse(originalPartitions)
 
-  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
+  private lazy val originalReaderFactory = batch.createReaderFactory()
+  private var filteredReaderFactory: Option[PartitionReaderFactory] = None
+
+  override def readerFactory: PartitionReaderFactory = {
+    filteredReaderFactory.getOrElse(originalReaderFactory)
+  }
 
   override lazy val inputRDD: RDD[InternalRow] = {
     new DataSourceRDD(sparkContext, partitions, readerFactory, 
supportsColumnar, customMetrics)
   }
 
   override def doCanonicalize(): BatchScanExec = {
-    this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output)))
+    this.copy(
+      output = output.map(QueryPlan.normalizeExpressions(_, output)),
+      dynamicFilters = QueryPlan.normalizePredicates(
+        dynamicFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
+        output))
+  }
+
+  override protected def doPrepare(): Unit = {
+    if (runnableDynamicFilters.nonEmpty) {
+      // TODO: this triggers a broadcast which we don't need
+      waitForSubqueries()
+
+      val dataSourceFilters = runnableDynamicFilters.flatMap {
+        case DynamicPruningExpression(e) => 
DataSourceStrategy.translateDynamicFilter(e)
+        case _ => None
+      }
+
+      if (dataSourceFilters.isEmpty) {
+        logWarning("Skipping dynamic filtering, could not derive source 
filters")
+        return
+      }
+
+      val filterableScan = scan.asInstanceOf[SupportsDynamicFiltering]
+      filterableScan.filter(dataSourceFilters.toArray)
+
+      val filteredBatch = scan.toBatch
+
+      filteredPartitions = Some(filteredBatch.planInputPartitions())

Review comment:
       @viirya is correct. Usually, there is no second planning. Instead, 
existing input partitions that has been already planned are filtered using 
dynamic filters. 

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
##########
@@ -17,38 +17,84 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import com.google.common.base.Objects
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan}
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory, Scan, SupportsDynamicFiltering}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 
 /**
  * Physical plan node for scanning a batch of data from a data source v2.
  */
 case class BatchScanExec(
     output: Seq[AttributeReference],
-    @transient scan: Scan) extends DataSourceV2ScanExecBase {
+    @transient scan: Scan,
+    dynamicFilters: Seq[Expression]) extends DataSourceV2ScanExecBase {
 
   @transient lazy val batch = scan.toBatch
 
+  private lazy val runnableDynamicFilters = dynamicFilters.collect {
+    case e: DynamicPruningExpression if e.child != Literal.TrueLiteral => e
+  }
+
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
-    case other: BatchScanExec => this.batch == other.batch
+    case other: BatchScanExec =>
+      this.batch == other.batch && this.dynamicFilters == other.dynamicFilters
     case _ => false
   }
 
-  override def hashCode(): Int = batch.hashCode()
+  override def hashCode(): Int = Objects.hashCode(batch, dynamicFilters)
+
+  @transient private lazy val originalPartitions = batch.planInputPartitions()
+  @transient private var filteredPartitions: Option[Seq[InputPartition]] = None
 
-  @transient override lazy val partitions: Seq[InputPartition] = 
batch.planInputPartitions()
+  override def partitions: Seq[InputPartition] = 
filteredPartitions.getOrElse(originalPartitions)
 
-  override lazy val readerFactory: PartitionReaderFactory = 
batch.createReaderFactory()
+  private lazy val originalReaderFactory = batch.createReaderFactory()
+  private var filteredReaderFactory: Option[PartitionReaderFactory] = None
+
+  override def readerFactory: PartitionReaderFactory = {
+    filteredReaderFactory.getOrElse(originalReaderFactory)
+  }
 
   override lazy val inputRDD: RDD[InternalRow] = {
     new DataSourceRDD(sparkContext, partitions, readerFactory, 
supportsColumnar, customMetrics)
   }
 
   override def doCanonicalize(): BatchScanExec = {
-    this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output)))
+    this.copy(
+      output = output.map(QueryPlan.normalizeExpressions(_, output)),
+      dynamicFilters = QueryPlan.normalizePredicates(
+        dynamicFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
+        output))
+  }
+
+  override protected def doPrepare(): Unit = {
+    if (runnableDynamicFilters.nonEmpty) {
+      // TODO: this triggers a broadcast which we don't need

Review comment:
       Will do. I am not sure about this point so we will need to discuss it a 
little bit.

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates
+ * Spark infers by reusing parts of the query.
+ *
+ * @since 3.2.0
+ */
+@Experimental
+public interface SupportsDynamicFiltering extends Scan {

Review comment:
       I did use `dynamicXXX` as it common throughout the code but 
`SupportsRuntimeFiltering` does sound more accurate. I'll update. 

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates
+ * Spark infers by reusing parts of the query.
+ *
+ * @since 3.2.0
+ */
+@Experimental
+public interface SupportsDynamicFiltering extends Scan {

Review comment:
       I did use `dynamicXXX` as it is common throughout the code but 
`SupportsRuntimeFiltering` does sound more accurate. I'll update. 

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates

Review comment:
       Well, we do filter input partitions but the filtering can be using a 
metadata column (e.g. file name).

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates
+ * Spark infers by reusing parts of the query.
+ *
+ * @since 3.2.0
+ */
+@Experimental
+public interface SupportsDynamicFiltering extends Scan {
+  /**
+   * Returns attributes this scan can be dynamically filtered by.
+   * <p>
+   * Spark will call {@link #filter(Filter[])} if it can derive a dynamic
+   * predicate for any of the filter attributes.
+   */
+  NamedReference[] filterAttributes();
+
+  /**
+   * Dynamically filters this scan.
+   * <p>
+   * The provided expressions must be interpreted as a set of filters that are 
ANDed together.
+   * Implementations may use the filters to prune originally planned {@link 
InputPartition}s.
+   * <p>
+   * Spark will call {@link Scan#toBatch()}, {@link 
Scan#toMicroBatchStream(String)} or
+   * {@link Scan#toContinuousStream(String)} again after filtering the scan 
dynamically.

Review comment:
       I don't think it works for streaming plans right now. Shall I just refer 
to `toBatch` in the doc?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
##########
@@ -227,3 +228,14 @@ object ReuseSubquery extends Rule[SparkPlan] {
     }
   }
 }
+
+object PrepareScans extends Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    val scans = plan.collect {
+      case scan: BatchScanExec => scan

Review comment:
       I'd like to avoid that too. I am using `prepare` here as it is 
idempotent. If we return a new `BatchScanExec`, will that mean the subqueries 
will be executed twice as the new plan won't be prepared?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
##########
@@ -66,10 +71,33 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
             }
           case _ => None
         }
+      case (resExp, r @ DataSourceV2ScanRelation(_, scan: 
SupportsDynamicFiltering, _)) =>
+        val filterAttrs = AttributeSet(resolveRefs(r, scan.filterAttributes))
+        if (resExp.references.subsetOf(filterAttrs)) {
+          Some(r)
+        } else {
+          None
+        }
       case _ => None
     }
   }
 
+  // TODO: is there a good place to put this?
+  private def resolveRefs(

Review comment:
       I think this can go in a separate PR once the review is done.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PartitionPruning.scala
##########
@@ -66,10 +71,33 @@ object PartitionPruning extends Rule[LogicalPlan] with 
PredicateHelper with Join
             }
           case _ => None
         }
+      case (resExp, r @ DataSourceV2ScanRelation(_, scan: 
SupportsDynamicFiltering, _)) =>
+        val filterAttrs = AttributeSet(resolveRefs(r, scan.filterAttributes))
+        if (resExp.references.subsetOf(filterAttrs)) {
+          Some(r)
+        } else {
+          None
+        }
       case _ => None
     }
   }
 
+  // TODO: is there a good place to put this?
+  private def resolveRefs(

Review comment:
       I think this can go in a separate PR before this PR once the review is 
done.

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates
+ * Spark infers by reusing parts of the query.
+ *
+ * @since 3.2.0
+ */
+@Experimental
+public interface SupportsDynamicFiltering extends Scan {
+  /**
+   * Returns attributes this scan can be dynamically filtered by.
+   * <p>
+   * Spark will call {@link #filter(Filter[])} if it can derive a dynamic
+   * predicate for any of the filter attributes.
+   */
+  NamedReference[] filterAttributes();
+
+  /**
+   * Dynamically filters this scan.
+   * <p>
+   * The provided expressions must be interpreted as a set of filters that are 
ANDed together.
+   * Implementations may use the filters to prune originally planned {@link 
InputPartition}s.
+   * <p>
+   * Spark will call {@link Scan#toBatch()}, {@link 
Scan#toMicroBatchStream(String)} or
+   * {@link Scan#toContinuousStream(String)} again after filtering the scan 
dynamically.
+   * The newly produced {@link Batch} or its streaming alternative may report 
a subset of
+   * originally planned {@link InputPartition}s.
+   *
+   * @param filters dynamic filters
+   */
+  void filter(Filter[] filters);

Review comment:
       Thanks for the pointer, @sunchao. I think @rdblue's comment 
[here](https://github.com/dbtsai/spark/pull/10/files#r368072666) matches my 
proposal above about adding v2 filters in parallel and having a default 
implementation that converts v2 to v1. 
   
   I can pick up the v2 filter API but I'd like to do that independently of 
this PR if everyone is on board. 

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates
+ * Spark infers by reusing parts of the query.
+ *
+ * @since 3.2.0
+ */
+@Experimental
+public interface SupportsDynamicFiltering extends Scan {
+  /**
+   * Returns attributes this scan can be dynamically filtered by.
+   * <p>
+   * Spark will call {@link #filter(Filter[])} if it can derive a dynamic
+   * predicate for any of the filter attributes.
+   */
+  NamedReference[] filterAttributes();
+
+  /**
+   * Dynamically filters this scan.
+   * <p>
+   * The provided expressions must be interpreted as a set of filters that are 
ANDed together.
+   * Implementations may use the filters to prune originally planned {@link 
InputPartition}s.
+   * <p>
+   * Spark will call {@link Scan#toBatch()}, {@link 
Scan#toMicroBatchStream(String)} or
+   * {@link Scan#toContinuousStream(String)} again after filtering the scan 
dynamically.

Review comment:
       Agreed. I'll update.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -96,6 +96,7 @@ case class AdaptiveSparkPlanExec(
   @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
     PlanAdaptiveDynamicPruningFilters(this),
     ReuseAdaptiveSubquery(context.subqueryCache),
+    PrepareScans,

Review comment:
       While it would be easier to not allow data sources to change the 
partitioning during dynamic filtering, I am really trying to avoid that as it 
would be a substantial limitation. I mention this in the "Output Partitioning 
and Dynamic Filtering" section of the design doc. There are primarily two 
problems. First, if dynamic filtering is selective, this can lead to a lot of 
(almost) empty tasks being triggered hurting the performance. Second, it is 
hard to check whether the output partitioning matches. We can validate the 
number of partitions but the rest is hard. 

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates
+ * Spark infers by reusing parts of the query.
+ *
+ * @since 3.2.0
+ */
+@Experimental
+public interface SupportsDynamicFiltering extends Scan {
+  /**
+   * Returns attributes this scan can be dynamically filtered by.
+   * <p>
+   * Spark will call {@link #filter(Filter[])} if it can derive a dynamic
+   * predicate for any of the filter attributes.
+   */
+  NamedReference[] filterAttributes();
+
+  /**
+   * Dynamically filters this scan.
+   * <p>
+   * The provided expressions must be interpreted as a set of filters that are 
ANDed together.
+   * Implementations may use the filters to prune originally planned {@link 
InputPartition}s.
+   * <p>
+   * Spark will call {@link Scan#toBatch()}, {@link 
Scan#toMicroBatchStream(String)} or
+   * {@link Scan#toContinuousStream(String)} again after filtering the scan 
dynamically.
+   * The newly produced {@link Batch} or its streaming alternative may report 
a subset of
+   * originally planned {@link InputPartition}s.
+   *
+   * @param filters dynamic filters
+   */
+  void filter(Filter[] filters);

Review comment:
       I'll be fine with either adding `SupportsPushDownV2Filters` or just 
adding new methods with a default implementation.
   
   ```
   interface SupportsPushDownFilters extends ScanBuilder {
     Filter[] pushFilters(Filter[] filters);
   
     default V2Filter[] pushFilters(V2Filter[] filters) {
       // convert v2 to v1 and call pushFilters for v1
       // convert v1 result back to v2
     }
   }
   ```
   
   In any case, we must ensure we don't break existing sources.

##########
File path: 
sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/SupportsDynamicFiltering.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.sources.Filter;
+
+/**
+ * A mix-in interface for {@link Scan}. Data sources can implement this 
interface if they can
+ * dynamically filter {@link InputPartition}s that were originally planned 
using predicates
+ * Spark infers by reusing parts of the query.
+ *
+ * @since 3.2.0
+ */
+@Experimental
+public interface SupportsDynamicFiltering extends Scan {
+  /**
+   * Returns attributes this scan can be dynamically filtered by.
+   * <p>
+   * Spark will call {@link #filter(Filter[])} if it can derive a dynamic
+   * predicate for any of the filter attributes.
+   */
+  NamedReference[] filterAttributes();
+
+  /**
+   * Dynamically filters this scan.
+   * <p>
+   * The provided expressions must be interpreted as a set of filters that are 
ANDed together.
+   * Implementations may use the filters to prune originally planned {@link 
InputPartition}s.
+   * <p>
+   * Spark will call {@link Scan#toBatch()}, {@link 
Scan#toMicroBatchStream(String)} or
+   * {@link Scan#toContinuousStream(String)} again after filtering the scan 
dynamically.
+   * The newly produced {@link Batch} or its streaming alternative may report 
a subset of
+   * originally planned {@link InputPartition}s.
+   *
+   * @param filters dynamic filters
+   */
+  void filter(Filter[] filters);

Review comment:
       Is it alright with everybody to consider v1 filters in the scope of this 
PR? I'll take over @dbtsai's PR later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to