Kontinuation opened a new issue, #1589:
URL: https://github.com/apache/datafusion-comet/issues/1589
### Describe the bug
AQE could transform SortMergeJoin or ShuffledHashJoin to BroadcastHashJoin
dynamically after discovering that one of the Exchange operator only shuffle
writes small amount of data. However, this optimization does not always happen
when using Comet.
TPC-H Q7 has an equi-join between `supplier` and `lineitem`. Spark could
discover that `supplier` is small enough to be broadcasted after running the
`Exchange` operator, and dynamically change the sort-merge-join to a broadcast
hash join (see `BroadcastHashJoin Inner BuildLeft (15)`):
```
== Physical Plan ==
AdaptiveSparkPlan (99)
+- == Final Plan ==
* Sort (62)
+- AQEShuffleRead (61)
+- ShuffleQueryStage (60), Statistics(sizeInBytes=288.0 B, rowCount=4)
+- Exchange (59)
+- * HashAggregate (58)
+- AQEShuffleRead (57)
+- ShuffleQueryStage (56), Statistics(sizeInBytes=2.8 KiB,
rowCount=36)
+- Exchange (55)
+- * HashAggregate (54)
+- * Project (53)
+- * BroadcastHashJoin Inner BuildRight (52)
:- * Project (49)
: +- * BroadcastHashJoin Inner BuildRight
(48)
: :- * Project (42)
: : +- * SortMergeJoin Inner (41)
: : :- * Sort (33)
: : : +- AQEShuffleRead (32)
: : : +- ShuffleQueryStage
(31), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7)
: : : +- Exchange (30)
: : : +- * Project (29)
: : : +- *
SortMergeJoin Inner (28)
: : : :- * Sort (20)
: : : : +-
AQEShuffleRead (19)
: : : : +-
ShuffleQueryStage (18), Statistics(sizeInBytes=667.5 MiB, rowCount=1.46E+7)
: : : : +-
Exchange (17)
: : : :
+- * Project (16)
: : : :
+- * BroadcastHashJoin Inner BuildLeft (15) <-- Transformed from
SortMergeJoin by AQE
: : : :
:- BroadcastQueryStage (8), Statistics(sizeInBytes=8.0 MiB,
rowCount=8.00E+4)
: : : :
: +- BroadcastExchange (7)
: : : :
: +- AQEShuffleRead (6)
: : : :
: +- ShuffleQueryStage (5), Statistics(sizeInBytes=1874.1 KiB,
rowCount=8.00E+4)
: : : :
: +- Exchange (4)
: : : :
: +- * Filter (3)
: : : :
: +- * ColumnarToRow (2)
: : : :
: +- Scan parquet (1)
: : : :
+- AQEShuffleRead (14)
: : : :
+- ShuffleQueryStage (13), Statistics(sizeInBytes=8.1 GiB,
rowCount=1.82E+8)
: : : :
+- Exchange (12)
: : : :
+- * Filter (11)
: : : :
+- * ColumnarToRow (10)
: : : :
+- Scan parquet (9)
: : : +- * Sort (27)
: : : +-
AQEShuffleRead (26)
: : : +-
ShuffleQueryStage (25), Statistics(sizeInBytes=3.4 GiB, rowCount=1.50E+8)
: : : +-
Exchange (24)
: : :
+- * Filter (23)
: : :
+- * ColumnarToRow (22)
: : :
+- Scan parquet (21)
: : +- * Sort (40)
: : +- AQEShuffleRead (39)
: : +- ShuffleQueryStage
(38), Statistics(sizeInBytes=27.5 MiB, rowCount=1.20E+6)
: : +- Exchange (37)
: : +- * Filter (36)
: : +- *
ColumnarToRow (35)
: : +- Scan
parquet (34)
: +- BroadcastQueryStage (47),
Statistics(sizeInBytes=1024.0 KiB, rowCount=2)
: +- BroadcastExchange (46)
: +- * Filter (45)
: +- * ColumnarToRow (44)
: +- Scan parquet (43)
+- BroadcastQueryStage (51),
Statistics(sizeInBytes=1024.0 KiB, rowCount=2)
+- ReusedExchange (50)
```
The following query plan is generated for running TPC-H Q7 with Comet
enabled, The CometSortMergeJoin was not transformed to CometBroadcastHashJoin:
```
== Physical Plan ==
AdaptiveSparkPlan (95)
+- == Final Plan ==
* CometColumnarToRow (58)
+- CometSort (57)
+- AQEShuffleRead (56)
+- ShuffleQueryStage (55), Statistics(sizeInBytes=288.0 B,
rowCount=4)
+- CometColumnarExchange (54)
+- CometHashAggregate (53)
+- AQEShuffleRead (52)
+- ShuffleQueryStage (51), Statistics(sizeInBytes=8.5
KiB, rowCount=72)
+- CometExchange (50)
+- CometHashAggregate (49)
+- CometProject (48)
+- CometBroadcastHashJoin (47)
:- CometProject (44)
: +- CometBroadcastHashJoin (43)
: :- CometProject (38)
: : +- CometSortMergeJoin (37)
: : :- CometSort (30)
: : : +- AQEShuffleRead (29)
: : : +- ShuffleQueryStage
(28), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7)
: : : +- CometExchange
(27)
: : : +- CometProject
(26)
: : : +-
CometSortMergeJoin (25)
: : : :-
CometSort (18)
: : : : +-
AQEShuffleRead (17)
: : : : +-
ShuffleQueryStage (16), Statistics(sizeInBytes=724.0 MiB, rowCount=2.92E+7)
: : : :
+- CometExchange (15)
: : : :
+- CometProject (14)
: : : :
+- CometSortMergeJoin (13) <-- Not transformed to CometBroadcastHashJoin
: : : :
:- CometSort (6)
: : : :
: +- AQEShuffleRead (5)
: : : :
: +- ShuffleQueryStage (4), Statistics(sizeInBytes=1281.0 KiB,
rowCount=1.60E+5)
: : : :
: +- CometExchange (3)
: : : :
: +- CometFilter (2)
: : : :
: +- CometScan parquet (1)
: : : :
+- CometSort (12)
: : : :
+- AQEShuffleRead (11)
: : : :
+- ShuffleQueryStage (10), Statistics(sizeInBytes=8.9 GiB,
rowCount=3.65E+8)
: : : :
+- CometExchange (9)
: : : :
+- CometFilter (8)
: : : :
+- CometScan parquet (7)
: : : +-
CometSort (24)
: : : +-
AQEShuffleRead (23)
: : : +-
ShuffleQueryStage (22), Statistics(sizeInBytes=2.2 GiB, rowCount=3.00E+8)
: : :
+- CometExchange (21)
: : :
+- CometFilter (20)
: : :
+- CometScan parquet (19)
: : +- CometSort (36)
: : +- AQEShuffleRead (35)
: : +- ShuffleQueryStage
(34), Statistics(sizeInBytes=18.7 MiB, rowCount=2.40E+6)
: : +- CometExchange
(33)
: : +- CometFilter
(32)
: : +- CometScan
parquet (31)
: +- BroadcastQueryStage (42),
Statistics(sizeInBytes=337.0 B, rowCount=2)
: +- CometBroadcastExchange (41)
: +- CometFilter (40)
: +- CometScan parquet (39)
+- BroadcastQueryStage (46),
Statistics(sizeInBytes=337.0 B, rowCount=2)
+- ReusedExchange (45)
```
### Steps to reproduce
Run TPC-H Q7 using TPC-H SF=100 dataset. The benchmarking code is in
https://github.com/apache/datafusion-benchmarks/tree/main/tpch.
```bash
spark-submit \
--master local[8] \
--conf spark.driver.memory=3g \
--conf spark.memory.offHeap.enabled=true \
--conf spark.memory.offHeap.size=16g \
--conf spark.jars=$COMET_JAR \
--conf spark.driver.extraClassPath=$COMET_JAR \
--conf spark.executor.extraClassPath=$COMET_JAR \
--conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \
--conf
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
\
--conf spark.comet.enabled=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--conf spark.comet.exec.shuffle.fallbackToColumnar=true \
--conf spark.comet.exec.shuffle.compression.codec=lz4 \
--conf spark.comet.exec.replaceSortMergeJoin=false \
tpcbench.py \
--benchmark tpch \
--data /path/to/tpch/sf100_parquet \
--queries ../../tpch/queries \
--output tpc-results \
--iterations 3
```
### Expected behavior
The inner most CometSortMergeJoin gets transformed to CometBroadcastHashJoin.
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]