[ 
https://issues.apache.org/jira/browse/SPARK-34081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuming Wang updated SPARK-34081:
--------------------------------
    Description: 
{code:scala}
spark.range(5).selectExpr("id as a", "id as b").write.saveAsTable("t1")
spark.range(3).selectExpr("id as c", "id as d").write.saveAsTable("t2")
spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM 
t2").explain
{code}

Current:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
   +- HashAggregate(keys=[a#16L, b#17L], functions=[])
      +- HashAggregate(keys=[a#16L, b#17L], functions=[])
         +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, 
[id=#68]
            +- HashAggregate(keys=[a#16L, b#17L], functions=[])
               +- BroadcastHashJoin [coalesce(a#16L, 0), isnull(a#16L), 
coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), 
coalesce(d#19L, 0), isnull(d#19L)], LeftSemi, BuildRight, false
                  :- FileScan parquet default.t1[a#16L,b#17L] Batched: true, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
                  +- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), 
isnull(input[0, bigint, true]), coalesce(input[1, bigint, true], 0), 
isnull(input[1, bigint, true])),false), [id=#64]
                     +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                        +- Exchange hashpartitioning(c#18L, d#19L, 5), 
ENSURE_REQUIREMENTS, [id=#61]
                           +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                              +- FileScan parquet default.t2[c#18L,d#19L] 
Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>

{noformat}


Expected:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
   +- HashAggregate(keys=[a#16L, b#17L], functions=[])
      +- BroadcastHashJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 
0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), 
isnull(d#19L)], LeftSemi, BuildRight, false
         :- HashAggregate(keys=[a#16L, b#17L], functions=[])
         :  +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, 
[id=#61]
         :     +- HashAggregate(keys=[a#16L, b#17L], functions=[])
         :        +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
         +- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), 
isnull(input[0, bigint, true]), coalesce(input[1, bigint, true], 0), 
isnull(input[1, bigint, true])),false), [id=#66]
            +- HashAggregate(keys=[c#18L, d#19L], functions=[])
               +- Exchange hashpartitioning(c#18L, d#19L, 5), 
ENSURE_REQUIREMENTS, [id=#63]
                  +- HashAggregate(keys=[c#18L, d#19L], functions=[])
                     +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
{noformat}

> Should not pushdown LeftSemi/LeftAnti over Aggregate
> ----------------------------------------------------
>
>                 Key: SPARK-34081
>                 URL: https://issues.apache.org/jira/browse/SPARK-34081
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Yuming Wang
>            Priority: Major
>
> {code:scala}
> spark.range(5).selectExpr("id as a", "id as b").write.saveAsTable("t1")
> spark.range(3).selectExpr("id as c", "id as d").write.saveAsTable("t2")
> spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM 
> t2").explain
> {code}
> Current:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[a#16L, b#17L], functions=[])
>    +- HashAggregate(keys=[a#16L, b#17L], functions=[])
>       +- HashAggregate(keys=[a#16L, b#17L], functions=[])
>          +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, 
> [id=#68]
>             +- HashAggregate(keys=[a#16L, b#17L], functions=[])
>                +- BroadcastHashJoin [coalesce(a#16L, 0), isnull(a#16L), 
> coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), 
> coalesce(d#19L, 0), isnull(d#19L)], LeftSemi, BuildRight, false
>                   :- FileScan parquet default.t1[a#16L,b#17L] Batched: true, 
> DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<a:bigint,b:bigint>
>                   +- BroadcastExchange 
> HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), 
> isnull(input[0, bigint, true]), coalesce(input[1, bigint, true], 0), 
> isnull(input[1, bigint, true])),false), [id=#64]
>                      +- HashAggregate(keys=[c#18L, d#19L], functions=[])
>                         +- Exchange hashpartitioning(c#18L, d#19L, 5), 
> ENSURE_REQUIREMENTS, [id=#61]
>                            +- HashAggregate(keys=[c#18L, d#19L], functions=[])
>                               +- FileScan parquet default.t2[c#18L,d#19L] 
> Batched: true, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<c:bigint,d:bigint>
> {noformat}
> Expected:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[a#16L, b#17L], functions=[])
>    +- HashAggregate(keys=[a#16L, b#17L], functions=[])
>       +- BroadcastHashJoin [coalesce(a#16L, 0), isnull(a#16L), 
> coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), 
> coalesce(d#19L, 0), isnull(d#19L)], LeftSemi, BuildRight, false
>          :- HashAggregate(keys=[a#16L, b#17L], functions=[])
>          :  +- Exchange hashpartitioning(a#16L, b#17L, 5), 
> ENSURE_REQUIREMENTS, [id=#61]
>          :     +- HashAggregate(keys=[a#16L, b#17L], functions=[])
>          :        +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, 
> DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<a:bigint,b:bigint>
>          +- BroadcastExchange 
> HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), 
> isnull(input[0, bigint, true]), coalesce(input[1, bigint, true], 0), 
> isnull(input[1, bigint, true])),false), [id=#66]
>             +- HashAggregate(keys=[c#18L, d#19L], functions=[])
>                +- Exchange hashpartitioning(c#18L, d#19L, 5), 
> ENSURE_REQUIREMENTS, [id=#63]
>                   +- HashAggregate(keys=[c#18L, d#19L], functions=[])
>                      +- FileScan parquet default.t2[c#18L,d#19L] Batched: 
> true, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<c:bigint,d:bigint>
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to