Peter Toth created SPARK-29375: ---------------------------------- Summary: Exchange reuse across all subquery levels Key: SPARK-29375 URL: https://issues.apache.org/jira/browse/SPARK-29375 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.0.0 Reporter: Peter Toth
Currently exchange reuse doesn't work across all subquery levels. Here is an example query: {noformat} SELECT (SELECT max(a.key) FROM testData AS a JOIN testData AS b ON b.key = a.key), a.key FROM testData AS a JOIN testData AS b ON b.key = a.key{noformat} where the plan is: {noformat} *(5) Project [Subquery scalar-subquery#240, [id=#193] AS scalarsubquery()#247, key#13] : +- Subquery scalar-subquery#240, [id=#193] : +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246]) : +- Exchange SinglePartition, true, [id=#189] : +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251]) : +- *(5) Project [key#13] : +- *(5) SortMergeJoin [key#13], [key#243], Inner : :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(key#13, 5), true, [id=#145] : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(5) SortMergeJoin [key#13], [key#241], Inner :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(key#13, 5), true, [id=#205] : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : +- Scan[obj#12] +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0 +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#205] {noformat} but it could be improved as here: {noformat} *(5) Project [Subquery scalar-subquery#240, [id=#211] AS scalarsubquery()#247, key#13] : +- Subquery scalar-subquery#240, [id=#211] : +- *(6) HashAggregate(keys=[], functions=[max(key#13)], output=[max(key)#246]) : +- Exchange SinglePartition, true, [id=#207] : +- *(5) HashAggregate(keys=[], functions=[partial_max(key#13)], output=[max#251]) : +- *(5) Project [key#13] : +- *(5) SortMergeJoin [key#13], [key#243], Inner : :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(key#13, 5), true, [id=#145] : : +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13] : : +- Scan[obj#12] : +- *(4) Sort [key#243 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#243], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(5) SortMergeJoin [key#13], [key#241], Inner :- *(2) Sort [key#13 ASC NULLS FIRST], false, 0 : +- ReusedExchange [key#13], Exchange hashpartitioning(key#13, 5), true, [id=#145] +- *(4) Sort [key#241 ASC NULLS FIRST], false, 0 +- ReusedExchange [key#241], Exchange hashpartitioning(key#13, 5), true, [id=#145] {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org