[ https://issues.apache.org/jira/browse/SPARK-45198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Anton Uvarov updated SPARK-45198: --------------------------------- Attachment: T2-Details-for-Query.png T1-Details-for-Query.png > problem using broadcast join with parquet/iceberg tables > -------------------------------------------------------- > > Key: SPARK-45198 > URL: https://issues.apache.org/jira/browse/SPARK-45198 > Project: Spark > Issue Type: Bug > Components: Build > Affects Versions: 3.4.1 > Reporter: Anton Uvarov > Priority: Trivial > Attachments: T1-Details-for-Query.png, T2-Details-for-Query.png > > Original Estimate: 612h > Remaining Estimate: 612h > > We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and > load_test_full_warehouse.generation_document_part. > Trying to make a left join of these two tables onto each other gives a > strange result. In the case where on the left side of the join we use a large > table load_test_full_warehouse.generation_document_part, the optimizer uses a > broadcast join. > However, in the case when on the left in the join we use a small reference > table, the optimizer chooses to execute the query using the merge sort. > Although it would seem that the small table on the left in a left join should > initiate a broadcast join. > An attempt to use hints and collect statistics did not yield results. The > following queries were used: > spark.sql(f"""create table iceberg_warehouse.t1 using iceberg > as SELECT /*+ BROADCAST(doc_tp) */ > doc.DOCUMENT_DATE > , doc_tp.NAME as DOCUMENT_TYPE > , COUNT(*) as DOC_QTY > FROM load_test_full_warehouse.generation_document_part doc > LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON > doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT > GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""") > == Physical Plan == > AtomicCreateTableAsSelect (25) > +- AdaptiveSparkPlan (24) > +- == Final Plan == > * HashAggregate (15) > +- AQEShuffleRead (14) > +- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB, > rowCount=3.12E+5) > +- Exchange (12) > +- * HashAggregate (11) > +- * Project (10) > +- * BroadcastHashJoin LeftOuter BuildRight (9) > :- * Project (3) > : +- * ColumnarToRow (2) > : +- Scan parquet > spark_catalog.load_test_full_warehouse.generation_document_part (1) > +- BroadcastQueryStage (8), > Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3) > +- BroadcastExchange (7) > +- * Filter (6) > +- * ColumnarToRow (5) > +- Scan parquet > spark_catalog.load_test_full_warehouse.gen_document_type (4) > +- == Initial Plan == > HashAggregate (23) > +- Exchange (22) > +- HashAggregate (21) > +- Project (20) > +- BroadcastHashJoin LeftOuter BuildRight (19) > :- Project (16) > : +- Scan parquet > spark_catalog.load_test_full_warehouse.generation_document_part (1) > +- BroadcastExchange (18) > +- Filter (17) > +- Scan parquet > spark_catalog.load_test_full_warehouse.gen_document_type (4) > > spark.sql(f"""create table iceberg_warehouse.t2 using iceberg > as SELECT /*+ BROADCAST(doc_tp) */ > doc.DOCUMENT_DATE > , doc_tp.NAME as DOCUMENT_TYPE > , COUNT(*) as DOC_QTY > FROM load_test_full_warehouse.gen_document_type doc_tp > LEFT JOIN load_test_full_warehouse.generation_document_part doc > ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT > GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""") > == Physical Plan == > AtomicCreateTableAsSelect (34) > +- AdaptiveSparkPlan (33) > +- == Final Plan == > * HashAggregate (21) > +- AQEShuffleRead (20) > +- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB, > rowCount=3.10E+4) > +- Exchange (18) > +- * HashAggregate (17) > +- * Project (16) > +- * SortMergeJoin LeftOuter (15) > :- * Sort (6) > : +- AQEShuffleRead (5) > : +- ShuffleQueryStage (4), > Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3) > : +- Exchange (3) > : +- * ColumnarToRow (2) > : +- Scan parquet > spark_catalog.load_test_full_warehouse.gen_document_type (1) > +- * Sort (14) > +- AQEShuffleRead (13) > +- ShuffleQueryStage (12), > Statistics(sizeInBytes=234.7 GiB, rowCount=1.05E+10) > +- Exchange (11) > +- * Project (10) > +- * Filter (9) > +- * ColumnarToRow (8) > +- Scan parquet > spark_catalog.load_test_full_warehouse.generation_document_part (7) > +- == Initial Plan == > HashAggregate (32) > +- Exchange (31) > +- HashAggregate (30) > +- Project (29) > +- SortMergeJoin LeftOuter (28) > :- Sort (23) > : +- Exchange (22) > : +- Scan parquet > spark_catalog.load_test_full_warehouse.gen_document_type (1) > +- Sort (27) > +- Exchange (26) > +- Project (25) > +- Filter (24) > +- Scan parquet > spark_catalog.load_test_full_warehouse.generation_document_part (7) -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org