Github user dilipbiswal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19451#discussion_r146466342
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 ---
    @@ -0,0 +1,114 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.optimizer
    +
    +import scala.annotation.tailrec
    +
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical._
    +import org.apache.spark.sql.catalyst.rules.Rule
    +
    +
    +/**
    + * If one or both of the datasets in the logical [[Except]] operator are 
purely transformed using
    + * [[Filter]], this rule will replace logical [[Except]] operator with a 
[[Filter]] operator by
    + * flipping the filter condition of the right child.
    + * {{{
    + *   SELECT a1, a2 FROM Tab1 WHERE a2 = 12 EXCEPT SELECT a1, a2 FROM Tab1 
WHERE a1 = 5
    + *   ==>  SELECT DISTINCT a1, a2 FROM Tab1 WHERE a2 = 12 AND (a1 is null 
OR a1 <> 5)
    + * }}}
    + *
    + * Note:
    + * Before flipping the filter condition of the right node, we should:
    + * 1. Combine all it's [[Filter]].
    + * 2. Apply InferFiltersFromConstraints rule (to support NULL values of 
the condition).
    + */
    +object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
    +
    +  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    +    case Except(left: Project, right) if isEligible(left, right) =>
    +      Project(left.projectList,
    +        Distinct(Filter(Not(transformCondition(left.child, 
skipProject(right))), left.child)))
    +
    +    case Except(left, right) if isEligible(left, right) =>
    +      Distinct(Filter(Not(transformCondition(left, skipProject(right))), 
left))
    +  }
    +
    +  private def transformCondition(left: LogicalPlan, right: LogicalPlan) = {
    +    val filterCondition = InferFiltersFromConstraints(combineFilters(right)
    +    ).asInstanceOf[Filter].condition
    +
    +    val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
    +    val transformedCondition = filterCondition transform { case a : 
AttributeReference =>
    +      attributeNameMap(a.name)
    +    }
    +
    +    transformedCondition
    +  }
    +
    +  private def isEligible(left: LogicalPlan, right: LogicalPlan) = (left, 
right) match {
    +    case (_, right @ (Project(_, _: Filter) | Filter(_, _))) => 
verifyConditions(left, right)
    +    case _ => false
    +  }
    +
    +  private def verifyConditions(left: LogicalPlan, right: LogicalPlan) = {
    +    val leftProjectList = projectList(left)
    +    val rightProjectList = projectList(right)
    +
    +    verifyFilterCondition(skipProject(left)) && 
verifyFilterCondition(skipProject(right)) &&
    +      Project(leftProjectList, 
nonFilterChild(skipProject(left))).sameResult(
    +        Project(rightProjectList, nonFilterChild(skipProject(right))))
    +  }
    +
    +  private def verifyFilterCondition(plan: LogicalPlan) = {
    --- End diff --
    
    @sathiyapk I suspect there may be a regression in the following query. I am 
using the same schema as the one used in except.sql.
    ```
    select t1.* from t1, t2 where t1.k = t2.k except select t1.* from t1, t2 
where t1.k = t2.k and t1.k != 'one'
    ```
    We are producing more rows using this new rule as we are not applying the 
distinct on the correct set of columns.
    Also for this test case, in transformCondition function, we are 
encountering collision in attribute names as both side of join producing 
(k#,v#, k#, v#).  For this case, can we please check if logic to replace the 
attribute references is correct ?
    @gatorsmile fyi.. please help double check.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to