Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21503#discussion_r194829647
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
    @@ -17,15 +17,56 @@
     
     package org.apache.spark.sql.execution.datasources.v2
     
    -import org.apache.spark.sql.Strategy
    +import org.apache.spark.sql.{execution, Strategy}
    +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet}
    +import org.apache.spark.sql.catalyst.planning.PhysicalOperation
     import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
     import org.apache.spark.sql.execution.SparkPlan
     import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
     
     object DataSourceV2Strategy extends Strategy {
       override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
    -    case r: DataSourceV2Relation =>
    -      DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader) :: Nil
    +    case PhysicalOperation(project, filters, relation: 
DataSourceV2Relation) =>
    +      val projectSet = AttributeSet(project.flatMap(_.references))
    +      val filterSet = AttributeSet(filters.flatMap(_.references))
    +
    +      val projection = if (filterSet.subsetOf(projectSet) &&
    +          AttributeSet(relation.output) == projectSet) {
    +        // When the required projection contains all of the filter columns 
and column pruning alone
    +        // can produce the required projection, push the required 
projection.
    +        // A final projection may still be needed if the data source 
produces a different column
    +        // order or if it cannot prune all of the nested columns.
    +        relation.output
    +      } else {
    +        // When there are filter columns not already in the required 
projection or when the required
    +        // projection is more complicated than column pruning, base column 
pruning on the set of
    +        // all columns needed by both.
    +        (projectSet ++ filterSet).toSeq
    +      }
    +
    +      val reader = relation.newReader
    --- End diff --
    
    It will configure two readers. One for the pushdown when converting to a 
physical plan and one for stats. The stats one should be temporary, though, 
since we want to address the problem. Configuring two readers instead of one 
allows us to decouple the problems so we can move forward with pushdown that 
works like the other data sources.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to