[jira] [Updated] (SPARK-34081) Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join
[ https://issues.apache.org/jira/browse/SPARK-34081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gengliang Wang updated SPARK-34081: --- Issue Type: Improvement (was: Bug) > Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as > broadcast join > --- > > Key: SPARK-34081 > URL: https://issues.apache.org/jira/browse/SPARK-34081 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.2.0 >Reporter: Yuming Wang >Assignee: Yuming Wang >Priority: Major > Fix For: 3.2.0 > > > Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. > {code:scala} > spark.range(5000L).selectExpr("id % 1 as a", "id % 1 as > b").write.saveAsTable("t1") spark.range(4000L).selectExpr("id % 8000 as > c", "id % 8000 as d").write.saveAsTable("t2") > spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM > t2").explain > {code} > Current: > {noformat} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) >+- HashAggregate(keys=[a#16L, b#17L], functions=[]) > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) > +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, > [id=#72] > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) >+- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), > coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), > coalesce(d#19L, 0), isnull(d#19L)], LeftSemi > :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) > ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS > FIRST], false, 0 > : +- Exchange hashpartitioning(coalesce(a#16L, 0), > isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, > [id=#65] > : +- FileScan parquet default.t1[a#16L,b#17L] Batched: > true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct > +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) > ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS > FIRST], false, 0 > +- Exchange hashpartitioning(coalesce(c#18L, 0), > isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, > [id=#66] > +- HashAggregate(keys=[c#18L, d#19L], functions=[]) >+- Exchange hashpartitioning(c#18L, d#19L, 5), > ENSURE_REQUIREMENTS, [id=#61] > +- HashAggregate(keys=[c#18L, d#19L], > functions=[]) > +- FileScan parquet default.t2[c#18L,d#19L] > Batched: true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct > {noformat} > > Expected: > {noformat} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) >+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, > [id=#74] > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) > +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, > 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), > isnull(d#19L)], LeftSemi > :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC > NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS > FIRST], false, 0 > : +- Exchange hashpartitioning(coalesce(a#16L, 0), > isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, > [id=#67] > : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) > :+- Exchange hashpartitioning(a#16L, b#17L, 5), > ENSURE_REQUIREMENTS, [id=#61] > : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) > : +- FileScan parquet default.t1[a#16L,b#17L] > Batched: true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct > +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC > NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS > FIRST], false, 0 >+- Exchange hashpartitioning(coalesce(c#18L, 0), > isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5),
[jira] [Updated] (SPARK-34081) Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join
[ https://issues.apache.org/jira/browse/SPARK-34081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-34081: Description: Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. {code:scala} spark.range(5000L).selectExpr("id % 1 as a", "id % 1 as b").write.saveAsTable("t1") spark.range(4000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain {code} Current: {noformat} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65] : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct {noformat} Expected: {noformat} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) :+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct {noformat} was: Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. {code:scala} spark.range(5000L).selectExpr("id % 1 as a", "id % 1 as
[jira] [Updated] (SPARK-34081) Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join
[ https://issues.apache.org/jira/browse/SPARK-34081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-34081: Description: Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases. {code:scala} spark.range(5000L).selectExpr("id % 1 as a", "id % 1 as b").write.saveAsTable("t1") spark.range(4000L).selectExpr("id % 8000 as c", "id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain{code} Current: {noformat} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#72] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#65] : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#66] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#61] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct {noformat} Expected: {noformat} == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74] +- HashAggregate(keys=[a#16L, b#17L], functions=[]) +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L)], LeftSemi :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) :+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#61] : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) : +- FileScan parquet default.t1[a#16L,b#17L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- Exchange hashpartitioning(c#18L, d#19L, 5), ENSURE_REQUIREMENTS, [id=#63] +- HashAggregate(keys=[c#18L, d#19L], functions=[]) +- FileScan parquet default.t2[c#18L,d#19L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct {noformat} was: {code:scala} spark.range(5).selectExpr("id as a", "id as b").write.saveAsTable("t1") spark.range(3).selectExpr("id as c", "id as d").write.saveAsTable("t2") spark.sql("SELECT
[jira] [Updated] (SPARK-34081) Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join
[ https://issues.apache.org/jira/browse/SPARK-34081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-34081: Summary: Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as broadcast join (was: Should not pushdown LeftSemi/LeftAnti over Aggregate) > Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as > broadcast join > --- > > Key: SPARK-34081 > URL: https://issues.apache.org/jira/browse/SPARK-34081 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.2.0 >Reporter: Yuming Wang >Priority: Major > > {code:scala} > spark.range(5).selectExpr("id as a", "id as b").write.saveAsTable("t1") > spark.range(3).selectExpr("id as c", "id as d").write.saveAsTable("t2") > spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM > t2").explain > {code} > Current: > {noformat} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) >+- HashAggregate(keys=[a#16L, b#17L], functions=[]) > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) > +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, > [id=#68] > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) >+- BroadcastHashJoin [coalesce(a#16L, 0), isnull(a#16L), > coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), > coalesce(d#19L, 0), isnull(d#19L)], LeftSemi, BuildRight, false > :- FileScan parquet default.t1[a#16L,b#17L] Batched: true, > DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct > +- BroadcastExchange > HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), > isnull(input[0, bigint, true]), coalesce(input[1, bigint, true], 0), > isnull(input[1, bigint, true])),false), [id=#64] > +- HashAggregate(keys=[c#18L, d#19L], functions=[]) > +- Exchange hashpartitioning(c#18L, d#19L, 5), > ENSURE_REQUIREMENTS, [id=#61] >+- HashAggregate(keys=[c#18L, d#19L], functions=[]) > +- FileScan parquet default.t2[c#18L,d#19L] > Batched: true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct > {noformat} > Expected: > {noformat} > == Physical Plan == > AdaptiveSparkPlan isFinalPlan=false > +- HashAggregate(keys=[a#16L, b#17L], functions=[]) >+- HashAggregate(keys=[a#16L, b#17L], functions=[]) > +- BroadcastHashJoin [coalesce(a#16L, 0), isnull(a#16L), > coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), > coalesce(d#19L, 0), isnull(d#19L)], LeftSemi, BuildRight, false > :- HashAggregate(keys=[a#16L, b#17L], functions=[]) > : +- Exchange hashpartitioning(a#16L, b#17L, 5), > ENSURE_REQUIREMENTS, [id=#61] > : +- HashAggregate(keys=[a#16L, b#17L], functions=[]) > :+- FileScan parquet default.t1[a#16L,b#17L] Batched: true, > DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct > +- BroadcastExchange > HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), > isnull(input[0, bigint, true]), coalesce(input[1, bigint, true], 0), > isnull(input[1, bigint, true])),false), [id=#66] > +- HashAggregate(keys=[c#18L, d#19L], functions=[]) >+- Exchange hashpartitioning(c#18L, d#19L, 5), > ENSURE_REQUIREMENTS, [id=#63] > +- HashAggregate(keys=[c#18L, d#19L], functions=[]) > +- FileScan parquet default.t2[c#18L,d#19L] Batched: > true, DataFilters: [], Format: Parquet, Location: > InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data..., > PartitionFilters: [], PushedFilters: [], ReadSchema: > struct > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org