Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/21857#discussion_r205566658 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1400,13 +1401,71 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { */ object ReplaceExceptWithAntiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case Except(left, right) => + case Except(left, right, false) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Distinct(Join(left, right, LeftAnti, joinCond.reduceLeftOption(And))) } } +/** + * Replaces logical [[ExceptAll]] operator using a combination of Union, Aggregate + * and Generate operator. + * + * Input Query : + * {{{ + * SELECT c1 FROM ut1 EXCEPT ALL SELECT c1 FROM ut2 + * }}} + * + * Rewritten Query: + * {{{ + * SELECT c1 + * FROM ( + * SELECT replicate_rows(sum_val, c1) AS (sum_val, c1) + * FROM ( + * SELECT c1, sum_val + * FROM ( + * SELECT c1, sum(vcol) AS sum_val + * FROM ( + * SELECT 1L as vcol, c1 FROM ut1 + * UNION ALL + * SELECT -1L as vcol, c1 FROM ut2 + * ) AS union_all + * GROUP BY union_all.c1 + * ) + * WHERE sum_val > 0 + * ) + * ) + * }}} + */ + +object RewriteExcepAll extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Except(left, right, true) => + assert(left.output.size == right.output.size) + + val newColumnLeft = Alias(Literal(1L), "vcol")() + val newColumnRight = Alias(Literal(-1L), "vcol")() + val modifiedLeftPlan = Project(Seq(newColumnLeft) ++ left.output, left) + val modifiedRightPlan = Project(Seq(newColumnRight) ++ right.output, right) + val unionPlan = Union(modifiedLeftPlan, modifiedRightPlan) + val aggSumCol = + Alias(AggregateExpression(Sum(unionPlan.output.head.toAttribute), Complete, false), "sum")() + val aggOutputColumns = left.output ++ Seq(aggSumCol) + val aggregatePlan = Aggregate(left.output, aggOutputColumns, unionPlan) + val filteredAggPlan = Filter(GreaterThan(aggSumCol.toAttribute, Literal(0L)), aggregatePlan) + val genRowPlan = Generate( + ReplicateRows(Seq(aggSumCol.toAttribute) ++ left.output), + Nil, + false, + None, --- End diff -- @gatorsmile will change.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org