Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20988#discussion_r182673957
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
 ---
    @@ -129,35 +151,41 @@ case class OptimizeMetadataOnlyQuery(catalog: 
SessionCatalog) extends Rule[Logic
     
       /**
        * A pattern that finds the partitioned table relation node inside the 
given plan, and returns a
    -   * pair of the partition attributes and the table relation node.
    +   * pair of the partition attributes, partition filters, and the table 
relation node.
        *
        * It keeps traversing down the given plan tree if there is a 
[[Project]] or [[Filter]] with
        * deterministic expressions, and returns result after reaching the 
partitioned table relation
        * node.
        */
    -  object PartitionedRelation {
    -
    -    def unapply(plan: LogicalPlan): Option[(AttributeSet, LogicalPlan)] = 
plan match {
    -      case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
    -        if fsRelation.partitionSchema.nonEmpty =>
    -        val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
    -        Some((AttributeSet(partAttrs), l))
    -
    -      case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
    -        val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
    -        Some((AttributeSet(partAttrs), relation))
    -
    -      case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
    -        unapply(child).flatMap { case (partAttrs, relation) =>
    -          if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
relation)) else None
    -        }
    +  object PartitionedRelation extends PredicateHelper {
    +
    +    def unapply(plan: LogicalPlan): Option[(AttributeSet, Seq[Expression], 
LogicalPlan)] = {
    +      plan match {
    +        case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _, _)
    +          if fsRelation.partitionSchema.nonEmpty =>
    +          val partAttrs = 
getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
    +          Some((AttributeSet(partAttrs), Nil, l))
    +
    +        case relation: HiveTableRelation if 
relation.tableMeta.partitionColumnNames.nonEmpty =>
    +          val partAttrs = 
getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation)
    +          Some((AttributeSet(partAttrs), Nil, relation))
    +
    +        case p @ Project(projectList, child) if 
projectList.forall(_.deterministic) =>
    +          unapply(child).flatMap { case (partAttrs, filters, relation) =>
    +            if (p.references.subsetOf(partAttrs)) Some((p.outputSet, 
filters, relation)) else None
    +          }
     
    -      case f @ Filter(condition, child) if condition.deterministic =>
    -        unapply(child).flatMap { case (partAttrs, relation) =>
    -          if (f.references.subsetOf(partAttrs)) Some((partAttrs, 
relation)) else None
    -        }
    +        case f @ Filter(condition, child) if condition.deterministic =>
    +          unapply(child).flatMap { case (partAttrs, filters, relation) =>
    +            if (f.references.subsetOf(partAttrs)) {
    +              Some((partAttrs, splitConjunctivePredicates(condition) ++ 
filters, relation))
    --- End diff --
    
    there is a bug here. Think about `Filter(x > 1, Project(p + 1 as x, 
Table(a, p, partitioned by p)))`, we will mistakenly report `x > 1` as 
partition predicates and use it to list partitions and fail.
    
    I think we should use `PhysicalOperation` here, which can help us to 
substitute the attributes in filter.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to