Github user eatoncys commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22561#discussion_r225053437
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 ---
    @@ -39,21 +40,31 @@ private[sql] object PruneFileSourcePartitions extends 
Rule[LogicalPlan] {
                 _,
                 _))
             if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined 
=>
    +
    +      val sparkSession = fsRelation.sparkSession
    +      val partitionColumns =
    +        logicalRelation.resolve(
    +          partitionSchema, sparkSession.sessionState.analyzer.resolver)
    +      val partitionSet = AttributeSet(partitionColumns)
           // The attribute name of predicate could be different than the one 
in schema in case of
           // case insensitive, we should change them to match the one in 
schema, so we donot need to
           // worry about case sensitivity anymore.
           val normalizedFilters = filters.map { e =>
    -        e transform {
    +        e transformUp {
               case a: AttributeReference =>
                 
a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name)
    +          // Replace the nonPartitionOps field with true in the 
And(partitionOps, nonPartitionOps)
    +          // to make the partition can be pruned
    +          case and @And(left, right) =>
    +            val leftPartition = 
left.references.filter(partitionSet.contains(_))
    +            val rightPartition = 
right.references.filter(partitionSet.contains(_))
    +            if (leftPartition.size == left.references.size && 
rightPartition.size == 0) {
    +              and.withNewChildren(Seq(left, Literal(true, BooleanType)))
    +            } else if (leftPartition.size == 0 && rightPartition.size == 
right.references.size) {
    +              and.withNewChildren(Seq(Literal(true, BooleanType), right))
    +            } else and
             }
           }
    -
    -      val sparkSession = fsRelation.sparkSession
    -      val partitionColumns =
    -        logicalRelation.resolve(
    -          partitionSchema, sparkSession.sessionState.analyzer.resolver)
    -      val partitionSet = AttributeSet(partitionColumns)
           val partitionKeyFilters =
    --- End diff --
    
    @cloud-fan Sorry, I don't understand very clearly, the function of 
splitConjunctivePredicates can only split and(a,b); if there is a or expression 
in the filter , for example 'where (p_d=2 and key=2) or (p_d=3 and key=3)', the 
result of splitConjunctivePredicates is '(((p_d#2 = 2) && (key#0 = 2)) || 
((p_d#2 = 3) && (key#0 = 3)))', the partition expression could not be split out.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to