[ 
https://issues.apache.org/jira/browse/SPARK-45198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Uvarov updated SPARK-45198:
---------------------------------
    Attachment: T2-Details-for-Query.png
                T1-Details-for-Query.png

> problem using broadcast join with parquet/iceberg tables
> --------------------------------------------------------
>
>                 Key: SPARK-45198
>                 URL: https://issues.apache.org/jira/browse/SPARK-45198
>             Project: Spark
>          Issue Type: Bug
>          Components: Build
>    Affects Versions: 3.4.1
>            Reporter: Anton Uvarov
>            Priority: Trivial
>         Attachments: T1-Details-for-Query.png, T2-Details-for-Query.png
>
>   Original Estimate: 612h
>  Remaining Estimate: 612h
>
> We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and 
> load_test_full_warehouse.generation_document_part.
> Trying to make a left join of these two tables onto each other gives a 
> strange result. In the case where on the left side of the join we use a large 
> table load_test_full_warehouse.generation_document_part, the optimizer uses a 
> broadcast join.
> However, in the case when on the left in the join we use a small reference 
> table, the optimizer chooses to execute the query using the merge sort. 
> Although it would seem that the small table on the left in a left join should 
> initiate a broadcast join.
>   An attempt to use hints and collect statistics did not yield results. The 
> following queries were used:
> spark.sql(f"""create table iceberg_warehouse.t1 using iceberg 
>               as SELECT /*+ BROADCAST(doc_tp) */
>                 doc.DOCUMENT_DATE
>                 , doc_tp.NAME as DOCUMENT_TYPE
>                 , COUNT(*) as DOC_QTY
>               FROM load_test_full_warehouse.generation_document_part doc
>               LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON 
> doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
>               GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> == Physical Plan ==
> AtomicCreateTableAsSelect (25)
> +- AdaptiveSparkPlan (24)
>    +- == Final Plan ==
>       * HashAggregate (15)
>       +- AQEShuffleRead (14)
>          +- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB, 
> rowCount=3.12E+5)
>             +- Exchange (12)
>                +- * HashAggregate (11)
>                   +- * Project (10)
>                      +- * BroadcastHashJoin LeftOuter BuildRight (9)
>                         :- * Project (3)
>                         :  +- * ColumnarToRow (2)
>                         :     +- Scan parquet 
> spark_catalog.load_test_full_warehouse.generation_document_part (1)
>                         +- BroadcastQueryStage (8), 
> Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3)
>                            +- BroadcastExchange (7)
>                               +- * Filter (6)
>                                  +- * ColumnarToRow (5)
>                                     +- Scan parquet 
> spark_catalog.load_test_full_warehouse.gen_document_type (4)
>    +- == Initial Plan ==
>       HashAggregate (23)
>       +- Exchange (22)
>          +- HashAggregate (21)
>             +- Project (20)
>                +- BroadcastHashJoin LeftOuter BuildRight (19)
>                   :- Project (16)
>                   :  +- Scan parquet 
> spark_catalog.load_test_full_warehouse.generation_document_part (1)
>                   +- BroadcastExchange (18)
>                      +- Filter (17)
>                         +- Scan parquet 
> spark_catalog.load_test_full_warehouse.gen_document_type (4)
>  
> spark.sql(f"""create table iceberg_warehouse.t2 using iceberg 
>               as SELECT /*+ BROADCAST(doc_tp) */
>                 doc.DOCUMENT_DATE
>                 , doc_tp.NAME as DOCUMENT_TYPE
>                 , COUNT(*) as DOC_QTY
>               FROM load_test_full_warehouse.gen_document_type doc_tp
>               LEFT JOIN load_test_full_warehouse.generation_document_part doc 
> ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
>               GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> == Physical Plan ==
> AtomicCreateTableAsSelect (34)
> +- AdaptiveSparkPlan (33)
>    +- == Final Plan ==
>       * HashAggregate (21)
>       +- AQEShuffleRead (20)
>          +- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB, 
> rowCount=3.10E+4)
>             +- Exchange (18)
>                +- * HashAggregate (17)
>                   +- * Project (16)
>                      +- * SortMergeJoin LeftOuter (15)
>                         :- * Sort (6)
>                         :  +- AQEShuffleRead (5)
>                         :     +- ShuffleQueryStage (4), 
> Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3)
>                         :        +- Exchange (3)
>                         :           +- * ColumnarToRow (2)
>                         :              +- Scan parquet 
> spark_catalog.load_test_full_warehouse.gen_document_type (1)
>                         +- * Sort (14)
>                            +- AQEShuffleRead (13)
>                               +- ShuffleQueryStage (12), 
> Statistics(sizeInBytes=234.7 GiB, rowCount=1.05E+10)
>                                  +- Exchange (11)
>                                     +- * Project (10)
>                                        +- * Filter (9)
>                                           +- * ColumnarToRow (8)
>                                              +- Scan parquet 
> spark_catalog.load_test_full_warehouse.generation_document_part (7)
>    +- == Initial Plan ==
>       HashAggregate (32)
>       +- Exchange (31)
>          +- HashAggregate (30)
>             +- Project (29)
>                +- SortMergeJoin LeftOuter (28)
>                   :- Sort (23)
>                   :  +- Exchange (22)
>                   :     +- Scan parquet 
> spark_catalog.load_test_full_warehouse.gen_document_type (1)
>                   +- Sort (27)
>                      +- Exchange (26)
>                         +- Project (25)
>                            +- Filter (24)
>                               +- Scan parquet 
> spark_catalog.load_test_full_warehouse.generation_document_part (7)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to