[ 
https://issues.apache.org/jira/browse/SPARK-22181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16187525#comment-16187525
 ] 

Sathiya Kumar commented on SPARK-22181:
---------------------------------------

Here is the rule implementation, which could be scheduled before the 
`ReplaceExceptWithAntiJoin` rule:

{code:java}
object ReplaceExceptWithNotFilter extends Rule[LogicalPlan] {

  implicit def nodeToFilter(node: LogicalPlan): Filter = 
node.asInstanceOf[Filter]

  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case Except(left, right) if isEligible(left, right) =>
      Distinct(
        Filter(Not(replaceAttributesIn(combineFilters(right).condition, left)), 
left)
      )
  }

  def isEligible(left: LogicalPlan, right: LogicalPlan): Boolean = (left, 
right) match {
    case (left: Filter, right: Filter) => parent(left).sameResult(parent(right))
    case (left, right: Filter) => left.sameResult(parent(right))
    case _ => false
  }

  def parent(plan: LogicalPlan): LogicalPlan = plan match {
    case x @ Filter(_, child) => parent(child)
    case x => x
  }

  def combineFilters(plan: LogicalPlan): LogicalPlan = CombineFilters(plan) 
match {
    case result if !result.fastEquals(plan) => combineFilters(result)
    case result => result
  }

  def replaceAttributesIn(condition: Expression, leftChild: LogicalPlan): 
Expression = {
    condition transform {
      case AttributeReference(name, _, _, _) =>
        leftChild.output.find(_.name == name).get
    }
  }
}
{code}


> ReplaceExceptWithNotFilter if one or both of the datasets are fully derived 
> out of Filters from a same parent
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22181
>                 URL: https://issues.apache.org/jira/browse/SPARK-22181
>             Project: Spark
>          Issue Type: New Feature
>          Components: Optimizer, SQL
>    Affects Versions: 2.1.1, 2.2.0
>            Reporter: Sathiya Kumar
>            Priority: Minor
>
> While applying Except operator between two datasets, if one or both of the 
> datasets are purely transformed using filter operations, then instead of 
> rewriting the Except operator using expensive join operation, we can rewrite 
> it using filter operation by flipping the filter condition of the right node.
> ![Case-1](https://github.com/sathiyapk/Blog-Posts/blob/master/images/spark-optimizer/ReplaceExceptWithNotFilter-case1.png)
> ![Case-2](https://github.com/sathiyapk/Blog-Posts/blob/master/images/spark-optimizer/ReplaceExceptWithNotFilter-case2.png)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to