[jira] [Updated] (SPARK-45198) problem using broadcast join with parquet/iceberg tables
[ https://issues.apache.org/jira/browse/SPARK-45198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Uvarov updated SPARK-45198: - Attachment: T2-Details-for-Query.png T1-Details-for-Query.png > problem using broadcast join with parquet/iceberg tables > > > Key: SPARK-45198 > URL: https://issues.apache.org/jira/browse/SPARK-45198 > Project: Spark > Issue Type: Bug > Components: Build >Affects Versions: 3.4.1 >Reporter: Anton Uvarov >Priority: Trivial > Attachments: T1-Details-for-Query.png, T2-Details-for-Query.png > > Original Estimate: 612h > Remaining Estimate: 612h > > We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and > load_test_full_warehouse.generation_document_part. > Trying to make a left join of these two tables onto each other gives a > strange result. In the case where on the left side of the join we use a large > table load_test_full_warehouse.generation_document_part, the optimizer uses a > broadcast join. > However, in the case when on the left in the join we use a small reference > table, the optimizer chooses to execute the query using the merge sort. > Although it would seem that the small table on the left in a left join should > initiate a broadcast join. > An attempt to use hints and collect statistics did not yield results. The > following queries were used: > spark.sql(f"""create table iceberg_warehouse.t1 using iceberg > as SELECT /*+ BROADCAST(doc_tp) */ > doc.DOCUMENT_DATE > , doc_tp.NAME as DOCUMENT_TYPE > , COUNT(*) as DOC_QTY > FROM load_test_full_warehouse.generation_document_part doc > LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON > doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT > GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""") > == Physical Plan == > AtomicCreateTableAsSelect (25) > +- AdaptiveSparkPlan (24) > +- == Final Plan == > * HashAggregate (15) > +- AQEShuffleRead (14) > +- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB, > rowCount=3.12E+5) > +- Exchange (12) > +- * HashAggregate (11) > +- * Project (10) > +- * BroadcastHashJoin LeftOuter BuildRight (9) > :- * Project (3) > : +- * ColumnarToRow (2) > : +- Scan parquet > spark_catalog.load_test_full_warehouse.generation_document_part (1) > +- BroadcastQueryStage (8), > Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3) > +- BroadcastExchange (7) > +- * Filter (6) > +- * ColumnarToRow (5) > +- Scan parquet > spark_catalog.load_test_full_warehouse.gen_document_type (4) > +- == Initial Plan == > HashAggregate (23) > +- Exchange (22) > +- HashAggregate (21) > +- Project (20) > +- BroadcastHashJoin LeftOuter BuildRight (19) > :- Project (16) > : +- Scan parquet > spark_catalog.load_test_full_warehouse.generation_document_part (1) > +- BroadcastExchange (18) > +- Filter (17) > +- Scan parquet > spark_catalog.load_test_full_warehouse.gen_document_type (4) > > spark.sql(f"""create table iceberg_warehouse.t2 using iceberg > as SELECT /*+ BROADCAST(doc_tp) */ > doc.DOCUMENT_DATE > , doc_tp.NAME as DOCUMENT_TYPE > , COUNT(*) as DOC_QTY > FROM load_test_full_warehouse.gen_document_type doc_tp > LEFT JOIN load_test_full_warehouse.generation_document_part doc > ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT > GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""") > == Physical Plan == > AtomicCreateTableAsSelect (34) > +- AdaptiveSparkPlan (33) > +- == Final Plan == > * HashAggregate (21) > +- AQEShuffleRead (20) > +- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB, > rowCount=3.10E+4) > +- Exchange (18) > +- * HashAggregate (17) > +- * Project (16) > +- * SortMergeJoin LeftOuter (15) > :- * Sort (6) > : +- AQEShuffleRead (5) > : +- ShuffleQueryStage (4), > Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3) > : +- Exchange (3) > : +- * ColumnarToRow (2) > : +- Scan parquet >
[jira] [Updated] (SPARK-45198) problem using broadcast join with parquet/iceberg tables
[ https://issues.apache.org/jira/browse/SPARK-45198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Uvarov updated SPARK-45198: - Description: We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and load_test_full_warehouse.generation_document_part. Trying to make a left join of these two tables onto each other gives a strange result. In the case where on the left side of the join we use a large table load_test_full_warehouse.generation_document_part, the optimizer uses a broadcast join. However, in the case when on the left in the join we use a small reference table, the optimizer chooses to execute the query using the merge sort. Although it would seem that the small table on the left in a left join should initiate a broadcast join. An attempt to use hints and collect statistics did not yield results. The following queries were used: spark.sql(f"""create table iceberg_warehouse.t1 using iceberg as SELECT /*+ BROADCAST(doc_tp) */ doc.DOCUMENT_DATE , doc_tp.NAME as DOCUMENT_TYPE , COUNT(*) as DOC_QTY FROM load_test_full_warehouse.generation_document_part doc LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""") == Physical Plan == AtomicCreateTableAsSelect (25) +- AdaptiveSparkPlan (24) +- == Final Plan == * HashAggregate (15) +- AQEShuffleRead (14) +- ShuffleQueryStage (13), Statistics(sizeInBytes=16.7 MiB, rowCount=3.12E+5) +- Exchange (12) +- * HashAggregate (11) +- * Project (10) +- * BroadcastHashJoin LeftOuter BuildRight (9) :- * Project (3) : +- * ColumnarToRow (2) : +- Scan parquet spark_catalog.load_test_full_warehouse.generation_document_part (1) +- BroadcastQueryStage (8), Statistics(sizeInBytes=1031.8 KiB, rowCount=1.00E+3) +- BroadcastExchange (7) +- * Filter (6) +- * ColumnarToRow (5) +- Scan parquet spark_catalog.load_test_full_warehouse.gen_document_type (4) +- == Initial Plan == HashAggregate (23) +- Exchange (22) +- HashAggregate (21) +- Project (20) +- BroadcastHashJoin LeftOuter BuildRight (19) :- Project (16) : +- Scan parquet spark_catalog.load_test_full_warehouse.generation_document_part (1) +- BroadcastExchange (18) +- Filter (17) +- Scan parquet spark_catalog.load_test_full_warehouse.gen_document_type (4) spark.sql(f"""create table iceberg_warehouse.t2 using iceberg as SELECT /*+ BROADCAST(doc_tp) */ doc.DOCUMENT_DATE , doc_tp.NAME as DOCUMENT_TYPE , COUNT(*) as DOC_QTY FROM load_test_full_warehouse.gen_document_type doc_tp LEFT JOIN load_test_full_warehouse.generation_document_part doc ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""") == Physical Plan == AtomicCreateTableAsSelect (34) +- AdaptiveSparkPlan (33) +- == Final Plan == * HashAggregate (21) +- AQEShuffleRead (20) +- ShuffleQueryStage (19), Statistics(sizeInBytes=1695.3 KiB, rowCount=3.10E+4) +- Exchange (18) +- * HashAggregate (17) +- * Project (16) +- * SortMergeJoin LeftOuter (15) :- * Sort (6) : +- AQEShuffleRead (5) : +- ShuffleQueryStage (4), Statistics(sizeInBytes=46.9 KiB, rowCount=1.00E+3) : +- Exchange (3) : +- * ColumnarToRow (2) : +- Scan parquet spark_catalog.load_test_full_warehouse.gen_document_type (1) +- * Sort (14) +- AQEShuffleRead (13) +- ShuffleQueryStage (12), Statistics(sizeInBytes=234.7 GiB, rowCount=1.05E+10) +- Exchange (11) +- * Project (10) +- * Filter (9) +- * ColumnarToRow (8) +- Scan parquet spark_catalog.load_test_full_warehouse.generation_document_part (7) +- == Initial Plan == HashAggregate (32) +- Exchange (31) +- HashAggregate (30) +- Project (29) +-
[jira] [Created] (SPARK-45198) problem using broadcast join with parquet/iceberg tables
Anton Uvarov created SPARK-45198: Summary: problem using broadcast join with parquet/iceberg tables Key: SPARK-45198 URL: https://issues.apache.org/jira/browse/SPARK-45198 Project: Spark Issue Type: Bug Components: Build Affects Versions: 3.4.1 Reporter: Anton Uvarov We have 2 Parquet tables: load_test_full_warehouse.gen_document_type and load_test_full_warehouse.generation_document_part. Trying to make a left join of these two tables onto each other gives a strange result. In the case where on the left side of the join we use a large table load_test_full_warehouse.generation_document_part, the optimizer uses a broadcast join. However, in the case when on the left in the join we use a small reference table, the optimizer chooses to execute the query using the merge sort. Although it would seem that the small table on the left in a left join should initiate a broadcast join. An attempt to use hints and collect statistics did not yield results. The following queries were used: spark.sql(f"""create table iceberg_warehouse.t1 using iceberg as SELECT /*+ BROADCAST(doc_tp) */ doc.DOCUMENT_DATE , doc_tp.NAME as DOCUMENT_TYPE , COUNT(*) as DOC_QTY FROM load_test_full_warehouse.generation_document_part doc LEFT JOIN load_test_full_warehouse.gen_document_type doc_tp ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""") spark.sql(f"""create table iceberg_warehouse.t2 using iceberg as SELECT /*+ BROADCAST(doc_tp) */ doc.DOCUMENT_DATE , doc_tp.NAME as DOCUMENT_TYPE , COUNT(*) as DOC_QTY FROM load_test_full_warehouse.gen_document_type doc_tp LEFT JOIN load_test_full_warehouse.generation_document_part doc ON doc.DOCUMENT_TYPE_ID_INT = doc_tp.DOCUMENT_TYPE_ID_INT GROUP BY doc.DOCUMENT_DATE, doc_tp.NAME""") -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org