Michael Tong created SPARK-40824: ------------------------------------ Summary: Certain aggregations cause extra exchange steps on unioned and bucketed tables Key: SPARK-40824 URL: https://issues.apache.org/jira/browse/SPARK-40824 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.3.0 Reporter: Michael Tong
An extension to https://issues.apache.org/jira/browse/SPARK-22898 Currently working on a POC where we store aggregations of features across different datasets. I have noticed that when you try to do certain aggregation operations across multiple tables, spark will introduce an extra exchange step {code:java} # initializing the tables sql(""" CREATE TABLE t1 (`id` BIGINT, `value` INT) USING PARQUET CLUSTERED BY (id) INTO 1 BUCKETS """) sql(""" CREATE TABLE t2 (`id` BIGINT, `value` INT) USING PARQUET CLUSTERED BY (id) INTO 1 BUCKETS """) sql("INSERT INTO TABLE t1 VALUES(1, 2)") sql("INSERT INTO TABLE t2 VALUES(1, 3)") # aggregation, note the exchange after the union operation sql(""" SELECT id, COUNT(*) FROM (SELECT id FROM t1 UNION SELECT id FROM t2) GROUP BY id """).explain() == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[id#92L], functions=[count(1)]) +- HashAggregate(keys=[id#92L], functions=[partial_count(1)]) +- HashAggregate(keys=[id#92L], functions=[]) +- Exchange hashpartitioning(id#92L, 100), ENSURE_REQUIREMENTS, [id=#202] +- HashAggregate(keys=[id#92L], functions=[]) +- Union :- FileScan parquet default.t1[id#92L] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1 +- FileScan parquet default.t2[id#94L] Batched: true, Bucketed: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1 {code} This seems like an issue with the query optimizer because if you use a different set and order of operations (in this case groupby/count on individual tables, join the tables, then infer the union count from the joined values), you get a query plan that doesn't have this exchange step {code:java} sql(""" SELECT t1_agg.id, t1_agg.count + t2_agg.count as count FROM (SELECT id, COUNT(*) as count from t1 GROUP BY id) as t1_agg JOIN (SELECT id, COUNT(*) as count from t2 GROUP BY id) as t2_agg ON t1_agg.id=t2_agg.id """).explain() # note the lack of an exchange step == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Project [id#92L, (count#121L + count#122L) AS count#123L] +- SortMergeJoin [id#92L], [id#94L], Inner :- Sort [id#92L ASC NULLS FIRST], false, 0 : +- HashAggregate(keys=[id#92L], functions=[count(1)]) : +- HashAggregate(keys=[id#92L], functions=[partial_count(1)]) : +- Filter isnotnull(id#92L) : +- FileScan parquet default.t1[id#92L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#92L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1 +- Sort [id#94L ASC NULLS FIRST], false, 0 +- HashAggregate(keys=[id#94L], functions=[count(1)]) +- HashAggregate(keys=[id#94L], functions=[partial_count(1)]) +- Filter isnotnull(id#94L) +- FileScan parquet default.t2[id#94L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#94L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3a://spark-warehouse/t2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>, SelectedBucketsCount: 1 out of 1 {code} It feels like the first union->aggregate query should not have an exchange step similar to the second one. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org