Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20485#discussion_r165579903
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
    @@ -81,33 +81,44 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
     
         // TODO: add more push down rules.
     
    -    pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
    +    val columnPruned = pushDownRequiredColumns(filterPushed, 
filterPushed.outputSet)
         // After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
    -    RemoveRedundantProject(filterPushed)
    +    RemoveRedundantProject(columnPruned)
       }
     
       // TODO: nested fields pruning
    -  private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
AttributeSet): Unit = {
    +  private def pushDownRequiredColumns(
    +      plan: LogicalPlan, requiredByParent: AttributeSet): LogicalPlan = {
         plan match {
    -      case Project(projectList, child) =>
    +      case p @ Project(projectList, child) =>
             val required = projectList.flatMap(_.references)
    -        pushDownRequiredColumns(child, AttributeSet(required))
    +        p.copy(child = pushDownRequiredColumns(child, 
AttributeSet(required)))
     
    -      case Filter(condition, child) =>
    +      case f @ Filter(condition, child) =>
             val required = requiredByParent ++ condition.references
    -        pushDownRequiredColumns(child, required)
    +        f.copy(child = pushDownRequiredColumns(child, required))
     
           case relation: DataSourceV2Relation => relation.reader match {
             case reader: SupportsPushDownRequiredColumns =>
    +          // TODO: Enable the below assert after we make 
`DataSourceV2Relation` immutable. Fow now
    +          // it's possible that the mutable reader being updated by 
someone else, and we need to
    +          // always call `reader.pruneColumns` here to correct it.
    +          // assert(relation.output.toStructType == reader.readSchema(),
    +          //  "Schema of data source reader does not match the relation 
plan.")
    +
               val requiredColumns = 
relation.output.filter(requiredByParent.contains)
               reader.pruneColumns(requiredColumns.toStructType)
     
    -        case _ =>
    +          val nameToAttr = 
relation.output.map(_.name).zip(relation.output).toMap
    +          val newOutput = reader.readSchema().map(_.name).map(nameToAttr)
    +          relation.copy(output = newOutput)
    --- End diff --
    
    @rdblue This is the bug I mentioned before. Finally I figured out a way to 
fix it surgically: always run column pruning even no column needs to be pruned. 
This helps us correct the required schema of the reader, if it's updated by 
someone else.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to