Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21574#discussion_r196173289
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
    @@ -17,51 +17,115 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.{execution, Strategy}
    -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.{sources, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression}
     import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    -import org.apache.spark.sql.execution.SparkPlan
    +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
    +import org.apache.spark.sql.execution.datasources.DataSourceStrategy
     import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
    +import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownCatalystFilters, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns}
     
     object DataSourceV2Strategy extends Strategy {
    -  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
    -      val projectSet = AttributeSet(project.flatMap(_.references))
    -      val filterSet = AttributeSet(filters.flatMap(_.references))
    -
    -      val projection = if (filterSet.subsetOf(projectSet) &&
    -          AttributeSet(relation.output) == projectSet) {
    -        // When the required projection contains all of the filter columns 
and column pruning alone
    -        // can produce the required projection, push the required 
projection.
    -        // A final projection may still be needed if the data source 
produces a different column
    -        // order or if it cannot prune all of the nested columns.
    -        relation.output
    -      } else {
    -        // When there are filter columns not already in the required 
projection or when the required
    -        // projection is more complicated than column pruning, base column 
pruning on the set of
    -        // all columns needed by both.
    -        (projectSet ++ filterSet).toSeq
    -      }
     
    -      val reader = relation.newReader
    +  /**
    +   * Pushes down filters to the data source reader
    +   *
    +   * @return pushed filter and post-scan filters.
    +   */
    +  private def pushFilters(
    --- End diff --
    
    +1 for moving these functions. I considered it in the other commit, but 
decided to go with fewer changes. I like them here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to