Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22561#discussion_r225030929
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
    @@ -39,21 +40,31 @@ private[sql] object PruneFileSourcePartitions extends 
Rule[LogicalPlan] {
                 _,
                 _))
             if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined 
=>
    +
    +      val sparkSession = fsRelation.sparkSession
    +      val partitionColumns =
    +        logicalRelation.resolve(
    +          partitionSchema, sparkSession.sessionState.analyzer.resolver)
    +      val partitionSet = AttributeSet(partitionColumns)
           // The attribute name of predicate could be different than the one 
in schema in case of
           // case insensitive, we should change them to match the one in 
schema, so we donot need to
           // worry about case sensitivity anymore.
           val normalizedFilters = filters.map { e =>
    -        e transform {
    +        e transformUp {
               case a: AttributeReference =>
                 
a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
    +          // Replace the nonPartitionOps field with true in the 
And(partitionOps, nonPartitionOps)
    +          // to make the partition can be pruned
    +          case and @And(left, right) =>
    +            val leftPartition = 
left.references.filter(partitionSet.contains(_))
    +            val rightPartition = 
right.references.filter(partitionSet.contains(_))
    +            if (leftPartition.size == left.references.size && 
rightPartition.size == 0) {
    +              and.withNewChildren(Seq(left, Literal(true, BooleanType)))
    +            } else if (leftPartition.size == 0 && rightPartition.size == 
right.references.size) {
    +              and.withNewChildren(Seq(Literal(true, BooleanType), right))
    +            } else and
             }
           }
    -
    -      val sparkSession = fsRelation.sparkSession
    -      val partitionColumns =
    -        logicalRelation.resolve(
    -          partitionSchema, sparkSession.sessionState.analyzer.resolver)
    -      val partitionSet = AttributeSet(partitionColumns)
           val partitionKeyFilters =
    --- End diff --
    
    I think a simpler change is
    ```
    val partitionKeyFilters = 
ExpressionSet(normalizedFilters.map(splitConjunctivePredicates).filter...)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to