Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20476#discussion_r165375489
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownOperatorsToDataSource.scala
 ---
    @@ -81,35 +81,34 @@ object PushDownOperatorsToDataSource extends 
Rule[LogicalPlan] with PredicateHel
     
         // TODO: add more push down rules.
     
    -    // TODO: nested fields pruning
    -    def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
Seq[Attribute]): Unit = {
    -      plan match {
    -        case Project(projectList, child) =>
    -          val required = 
projectList.filter(requiredByParent.contains).flatMap(_.references)
    -          pushDownRequiredColumns(child, required)
    -
    -        case Filter(condition, child) =>
    -          val required = requiredByParent ++ condition.references
    -          pushDownRequiredColumns(child, required)
    -
    -        case DataSourceV2Relation(fullOutput, reader) => reader match {
    -          case r: SupportsPushDownRequiredColumns =>
    -            // Match original case of attributes.
    -            val attrMap = AttributeMap(fullOutput.zip(fullOutput))
    -            val requiredColumns = requiredByParent.map(attrMap)
    -            r.pruneColumns(requiredColumns.toStructType)
    -          case _ =>
    -        }
    +    pushDownRequiredColumns(filterPushed, filterPushed.outputSet)
    +    // After column pruning, we may have redundant PROJECT nodes in the 
query plan, remove them.
    +    RemoveRedundantProject(filterPushed)
    +  }
    +
    +  // TODO: nested fields pruning
    +  private def pushDownRequiredColumns(plan: LogicalPlan, requiredByParent: 
AttributeSet): Unit = {
    +    plan match {
    +      case Project(projectList, child) =>
    +        val required = projectList.flatMap(_.references)
    +        pushDownRequiredColumns(child, AttributeSet(required))
    +
    +      case Filter(condition, child) =>
    +        val required = requiredByParent ++ condition.references
    +        pushDownRequiredColumns(child, required)
     
    -        // TODO: there may be more operators can be used to calculate 
required columns, we can add
    -        // more and more in the future.
    -        case _ => plan.children.foreach(child => 
pushDownRequiredColumns(child, child.output))
    +      case relation: DataSourceV2Relation => relation.reader match {
    +        case reader: SupportsPushDownRequiredColumns =>
    +          val requiredColumns = 
relation.output.filter(requiredByParent.contains)
    --- End diff --
    
    a cleaner way to retain the original case of attributes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to