[jira] [Updated] (SPARK-45198) problem using broadcast join with parquet/iceberg tables

2023-09-18 Thread Anton Uvarov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Uvarov updated SPARK-45198:
-
Attachment: T2-Details-for-Query.png
T1-Details-for-Query.png

> problem using broadcast join with parquet/iceberg tables
> 
>
> Key: SPARK-45198
> URL: https://issues.apache.org/jira/browse/SPARK-45198
> Project: Spark
>  Issue Type: Bug
>  Components: Build
>Affects Versions: 3.4.1
>Reporter: Anton Uvarov
>Priority: Trivial
> Attachments: T1-Details-for-Query.png, T2-Details-for-Query.png
>
>   Original Estimate: 612h
>  Remaining Estimate: 612h
>
> We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and 
> load_test_full_warehouse.generation_document_part.
> Trying to make a left join of these two tables onto each other gives a 
> strange result. In the case where on the left side of the join we use a large 
> table load_test_full_warehouse.generation_document_part, the optimizer uses a 
> broadcast join.
> However, in the case when on the left in the join we use a small reference 
> table, the optimizer chooses to execute the query using the merge sort. 
> Although it would seem that the small table on the left in a left join should 
> initiate a broadcast join.
>   An attempt to use hints and collect statistics did not yield results. The 
> following queries were used:
> spark.sql(f"""create table iceberg_warehouse.t1 using iceberg 
>               as SELECT /*+ BROADCAST(doc_tp) */
>                 doc.DOCUMENT_DATE
>                 , doc_tp.NAME as DOCUMENT_TYPE
>                 , COUNT(*) as DOC_QTY
>               FROM load_test_full_warehouse.generation_document_part doc
>               LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON 
> doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
>               GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> == Physical Plan ==
> AtomicCreateTableAsSelect (25)
> +- AdaptiveSparkPlan (24)
>    +- == Final Plan ==
>       * HashAggregate (15)
>       +- AQEShuffleRead (14)
>          +- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB, 
> rowCount=3.12E+5)
>             +- Exchange (12)
>                +- * HashAggregate (11)
>                   +- * Project (10)
>                      +- * BroadcastHashJoin LeftOuter BuildRight (9)
>                         :- * Project (3)
>                         :  +- * ColumnarToRow (2)
>                         :     +- Scan parquet 
> spark_catalog.load_test_full_warehouse.generation_document_part (1)
>                         +- BroadcastQueryStage (8), 
> Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3)
>                            +- BroadcastExchange (7)
>                               +- * Filter (6)
>                                  +- * ColumnarToRow (5)
>                                     +- Scan parquet 
> spark_catalog.load_test_full_warehouse.gen_document_type (4)
>    +- == Initial Plan ==
>       HashAggregate (23)
>       +- Exchange (22)
>          +- HashAggregate (21)
>             +- Project (20)
>                +- BroadcastHashJoin LeftOuter BuildRight (19)
>                   :- Project (16)
>                   :  +- Scan parquet 
> spark_catalog.load_test_full_warehouse.generation_document_part (1)
>                   +- BroadcastExchange (18)
>                      +- Filter (17)
>                         +- Scan parquet 
> spark_catalog.load_test_full_warehouse.gen_document_type (4)
>  
> spark.sql(f"""create table iceberg_warehouse.t2 using iceberg 
>               as SELECT /*+ BROADCAST(doc_tp) */
>                 doc.DOCUMENT_DATE
>                 , doc_tp.NAME as DOCUMENT_TYPE
>                 , COUNT(*) as DOC_QTY
>               FROM load_test_full_warehouse.gen_document_type doc_tp
>               LEFT JOIN load_test_full_warehouse.generation_document_part doc 
> ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
>               GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")
> == Physical Plan ==
> AtomicCreateTableAsSelect (34)
> +- AdaptiveSparkPlan (33)
>    +- == Final Plan ==
>       * HashAggregate (21)
>       +- AQEShuffleRead (20)
>          +- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB, 
> rowCount=3.10E+4)
>             +- Exchange (18)
>                +- * HashAggregate (17)
>                   +- * Project (16)
>                      +- * SortMergeJoin LeftOuter (15)
>                         :- * Sort (6)
>                         :  +- AQEShuffleRead (5)
>                         :     +- ShuffleQueryStage (4), 
> Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3)
>                         :        +- Exchange (3)
>                         :           +- * ColumnarToRow (2)
>                         :              +- Scan parquet 
> 

[jira] [Updated] (SPARK-45198) problem using broadcast join with parquet/iceberg tables

2023-09-18 Thread Anton Uvarov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-45198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Uvarov updated SPARK-45198:
-
Description: 
We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and 
load_test_full_warehouse.generation_document_part.
Trying to make a left join of these two tables onto each other gives a strange 
result. In the case where on the left side of the join we use a large table 
load_test_full_warehouse.generation_document_part, the optimizer uses a 
broadcast join.
However, in the case when on the left in the join we use a small reference 
table, the optimizer chooses to execute the query using the merge sort. 
Although it would seem that the small table on the left in a left join should 
initiate a broadcast join.
  An attempt to use hints and collect statistics did not yield results. The 
following queries were used:

spark.sql(f"""create table iceberg_warehouse.t1 using iceberg 
              as SELECT /*+ BROADCAST(doc_tp) */
                doc.DOCUMENT_DATE
                , doc_tp.NAME as DOCUMENT_TYPE
                , COUNT(*) as DOC_QTY
              FROM load_test_full_warehouse.generation_document_part doc
              LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON 
doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
              GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")

== Physical Plan ==
AtomicCreateTableAsSelect (25)
+- AdaptiveSparkPlan (24)
   +- == Final Plan ==
      * HashAggregate (15)
      +- AQEShuffleRead (14)
         +- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB, 
rowCount=3.12E+5)
            +- Exchange (12)
               +- * HashAggregate (11)
                  +- * Project (10)
                     +- * BroadcastHashJoin LeftOuter BuildRight (9)
                        :- * Project (3)
                        :  +- * ColumnarToRow (2)
                        :     +- Scan parquet 
spark_catalog.load_test_full_warehouse.generation_document_part (1)
                        +- BroadcastQueryStage (8), 
Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3)
                           +- BroadcastExchange (7)
                              +- * Filter (6)
                                 +- * ColumnarToRow (5)
                                    +- Scan parquet 
spark_catalog.load_test_full_warehouse.gen_document_type (4)
   +- == Initial Plan ==
      HashAggregate (23)
      +- Exchange (22)
         +- HashAggregate (21)
            +- Project (20)
               +- BroadcastHashJoin LeftOuter BuildRight (19)
                  :- Project (16)
                  :  +- Scan parquet 
spark_catalog.load_test_full_warehouse.generation_document_part (1)
                  +- BroadcastExchange (18)
                     +- Filter (17)
                        +- Scan parquet 
spark_catalog.load_test_full_warehouse.gen_document_type (4)

 

spark.sql(f"""create table iceberg_warehouse.t2 using iceberg 
              as SELECT /*+ BROADCAST(doc_tp) */
                doc.DOCUMENT_DATE
                , doc_tp.NAME as DOCUMENT_TYPE
                , COUNT(*) as DOC_QTY
              FROM load_test_full_warehouse.gen_document_type doc_tp
              LEFT JOIN load_test_full_warehouse.generation_document_part doc 
ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
              GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")

== Physical Plan ==
AtomicCreateTableAsSelect (34)
+- AdaptiveSparkPlan (33)
   +- == Final Plan ==
      * HashAggregate (21)
      +- AQEShuffleRead (20)
         +- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB, 
rowCount=3.10E+4)
            +- Exchange (18)
               +- * HashAggregate (17)
                  +- * Project (16)
                     +- * SortMergeJoin LeftOuter (15)
                        :- * Sort (6)
                        :  +- AQEShuffleRead (5)
                        :     +- ShuffleQueryStage (4), 
Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3)
                        :        +- Exchange (3)
                        :           +- * ColumnarToRow (2)
                        :              +- Scan parquet 
spark_catalog.load_test_full_warehouse.gen_document_type (1)
                        +- * Sort (14)
                           +- AQEShuffleRead (13)
                              +- ShuffleQueryStage (12), 
Statistics(sizeInBytes=234.7 GiB, rowCount=1.05E+10)
                                 +- Exchange (11)
                                    +- * Project (10)
                                       +- * Filter (9)
                                          +- * ColumnarToRow (8)
                                             +- Scan parquet 
spark_catalog.load_test_full_warehouse.generation_document_part (7)
   +- == Initial Plan ==
      HashAggregate (32)
      +- Exchange (31)
         +- HashAggregate (30)
            +- Project (29)
               +- 

[jira] [Created] (SPARK-45198) problem using broadcast join with parquet/iceberg tables

2023-09-18 Thread Anton Uvarov (Jira)
Anton Uvarov created SPARK-45198:


 Summary: problem using broadcast join with parquet/iceberg tables
 Key: SPARK-45198
 URL: https://issues.apache.org/jira/browse/SPARK-45198
 Project: Spark
  Issue Type: Bug
  Components: Build
Affects Versions: 3.4.1
Reporter: Anton Uvarov


We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and 
load_test_full_warehouse.generation_document_part.
Trying to make a left join of these two tables onto each other gives a strange 
result. In the case where on the left side of the join we use a large table 
load_test_full_warehouse.generation_document_part, the optimizer uses a 
broadcast join.
However, in the case when on the left in the join we use a small reference 
table, the optimizer chooses to execute the query using the merge sort. 
Although it would seem that the small table on the left in a left join should 
initiate a broadcast join.
  An attempt to use hints and collect statistics did not yield results. The 
following queries were used:

spark.sql(f"""create table iceberg_warehouse.t1 using iceberg 
              as SELECT /*+ BROADCAST(doc_tp) */
                doc.DOCUMENT_DATE
                , doc_tp.NAME as DOCUMENT_TYPE
                , COUNT(*) as DOC_QTY
              FROM load_test_full_warehouse.generation_document_part doc
              LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON 
doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
              GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")

 

spark.sql(f"""create table iceberg_warehouse.t2 using iceberg 
              as SELECT /*+ BROADCAST(doc_tp) */
                doc.DOCUMENT_DATE
                , doc_tp.NAME as DOCUMENT_TYPE
                , COUNT(*) as DOC_QTY
              FROM load_test_full_warehouse.gen_document_type doc_tp
              LEFT JOIN load_test_full_warehouse.generation_document_part doc 
ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT
              GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""")

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org