ssandona opened a new issue, #13374:
URL: https://github.com/apache/iceberg/issues/13374
### Apache Iceberg version
1.9.0
### Query engine
Spark
### Please describe the bug ๐
## Issue description
CBO seems not to be using properly NDVs on Iceberg tables.
In specific Joins reorder is not optimized based on NDVs.
Running exactly the same joins on parquet tables (no iceberg) populated in
the same way we see CBO is properly reordering.
So CBO joins reordering works properly on parquet tables but not on iceberg
tables.
Below reproduction steps:
## Reproduction steps
I populate my Iceberg tables in order to have this setup:
- online_orders_ndv:
- 100,000,000 rows
- order_id column: 100,000,000 NDVs
- total_returns_ndv:
- 2,000,000 rows
- customer column: ~20,000 NDVs
- order_id column: ~1,000,000 NDVs
- locations_ndv:
- 1,000,000 rows
- customer column: ~20,000 NDVs
- store_addr column: 1,000,000 NDVs
```
# Define table names and locations
S3_BASE = "s3://mybucket/datasets/temp"
TABLE_NAMES = {
'online_orders': 'default.online_orders_ndv',
'total_returns': 'default.total_returns_ndv',
'locations': 'default.locations_ndv'
}
TABLE_LOCATIONS = {
'online_orders': f"{S3_BASE}/online_orders_ndv",
'total_returns': f"{S3_BASE}/total_returns_ndv",
'locations': f"{S3_BASE}/locations_ndv"
}
# Create tables and insert data using Spark SQL
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAMES['online_orders']} (
order_id BIGINT
) USING iceberg
LOCATION '{TABLE_LOCATIONS['online_orders']}'
""")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAMES['total_returns']} (
order_id BIGINT,
customer STRING
) USING iceberg
LOCATION '{TABLE_LOCATIONS['total_returns']}'
""")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAMES['locations']} (
customer STRING,
store_addr STRING
) USING iceberg
LOCATION '{TABLE_LOCATIONS['locations']}'
""")
# Insert data into online_orders (100M rows)
spark.sql(f"""
INSERT INTO {TABLE_NAMES['online_orders']}
SELECT id as order_id
FROM (
SELECT explode(sequence(1, 100000000)) as id
)
""")
# Insert data into total_returns (2M rows)
# 20K customers with 100 returns each
spark.sql(f"""
WITH customer_base AS (
SELECT explode(sequence(1, 20000)) as customer_id
),
returns_base AS (
SELECT
customer_id,
explode(sequence(1, 100)) as return_seq
FROM customer_base
)
INSERT INTO {TABLE_NAMES['total_returns']}
SELECT
CASE
WHEN rand() < 0.5 THEN CAST(rand() * 100000000 as BIGINT)
ELSE -1
END as order_id,
concat('CUST_', customer_id) as customer
FROM returns_base
""")
# Insert data into locations (1M rows)
# 20K customers with 50 locations each
spark.sql(f"""
WITH customer_base AS (
SELECT explode(sequence(1, 20000)) as customer_id
),
location_base AS (
SELECT
customer_id,
explode(sequence(1, 50)) as store_seq
FROM customer_base
)
INSERT INTO {TABLE_NAMES['locations']}
SELECT
concat('CUST_', customer_id) as customer,
concat('STORE_', customer_id, '_', store_seq) as store_addr
FROM location_base
""")
```
I then want to test this join:
```
df=spark.sql(f"""
SELECT o.order_id, r.customer, l.store_addr
FROM {TABLE_NAMES['online_orders']} o
JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
""")
df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
```
## Test 1
Without CBO, with default values:
```
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df=spark.sql(f"""
SELECT o.order_id, r.customer, l.store_addr
FROM {TABLE_NAMES['online_orders']} o
JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
""")
df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
```
I see as a result Joins are executed in the order of the query (which is
actually the best)
How processed:
* RES1:
* online_orders (100 million) JOIN total_returns (2 million)
* number of output rows: 998,891 (~1 million)
* Final res:
* RES1 (998,891) JOIN locations_ndv (1 million)
* number of output rows: 49,944,550
Here the plan:
```
+- == Initial Plan ==
Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test,
false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id,
customer, store_addr]
+- WriteFiles
+- Project [order_id#808L, customer#810, store_addr#812]
+- SortMergeJoin [customer#810], [customer#811], Inner
:- Sort [customer#810 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(customer#810, 1000),
ENSURE_REQUIREMENTS, [plan_id=1509]
: +- Project [order_id#808L, customer#810]
: +- SortMergeJoin [order_id#808L], [order_id#809L], Inner
: :- Sort [order_id#808L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(order_id#808L,
1000), ENSURE_REQUIREMENTS, [plan_id=1501]
: : +- Filter isnotnull(order_id#808L)
: : +- BatchScan
spark_catalog.default.online_orders_ndv[order_id#808L]
spark_catalog.default.online_orders_ndv (branch=null) [filters=order_id IS NOT
NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
: +- Sort [order_id#809L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(order_id#809L,
1000), ENSURE_REQUIREMENTS, [plan_id=1502]
: +- Filter (isnotnull(order_id#809L) AND
isnotnull(customer#810))
: +- BatchScan
spark_catalog.default.total_returns_ndv[order_id#809L, customer#810]
spark_catalog.default.total_returns_ndv (branch=null) [filters=order_id IS NOT
NULL, customer IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
+- Sort [customer#811 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(customer#811, 1000),
ENSURE_REQUIREMENTS, [plan_id=1510]
+- Filter isnotnull(customer#811)
+- BatchScan
spark_catalog.default.locations_ndv[customer#811, store_addr#812]
spark_catalog.default.locations_ndv (branch=null) [filters=customer IS NOT
NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
```
## Test 2 - with CBO - No NDV stats collected
```
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.iceberg.report-column-stats", "true")
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
df=spark.sql(f"""
SELECT o.order_id, r.customer, l.store_addr
FROM {TABLE_NAMES['online_orders']} o
JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
""")
df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
```
How processed:
* RES1:
* total_returns_ndv (2 million) JOIN locations_ndv (1 million)
* number of output rows: 100,000,000
* Final res:
* RES1 (100 million) JOIN online_orders (100 million)
* number of output rows: 49,944,550
CBO only checked the amount of records of the tables and it assumed
(wrongly) that joining the small tables first will give less records in output.
Here the plan:
```
+- == Initial Plan ==
Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test,
false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id,
customer, store_addr]
+- WriteFiles
+- Project [order_id#839L, customer#841, store_addr#843]
+- SortMergeJoin [order_id#840L], [order_id#839L], Inner
:- Sort [order_id#840L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(order_id#840L, 1000),
ENSURE_REQUIREMENTS, [plan_id=1923]
: +- Project [order_id#840L, customer#841, store_addr#843]
: +- SortMergeJoin [customer#841], [customer#842], Inner
: :- Sort [customer#841 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(customer#841, 1000),
ENSURE_REQUIREMENTS, [plan_id=1915]
: : +- Filter (isnotnull(order_id#840L) AND
isnotnull(customer#841))
: : +- BatchScan
spark_catalog.default.total_returns_ndv[order_id#840L, customer#841]
spark_catalog.default.total_returns_ndv (branch=null) [filters=order_id IS NOT
NULL, customer IS NOT NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
: +- Sort [customer#842 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(customer#842, 1000),
ENSURE_REQUIREMENTS, [plan_id=1916]
: +- Filter isnotnull(customer#842)
: +- BatchScan
spark_catalog.default.locations_ndv[customer#842, store_addr#843]
spark_catalog.default.locations_ndv (branch=null) [filters=customer IS NOT
NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
+- Sort [order_id#839L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(order_id#839L, 1000),
ENSURE_REQUIREMENTS, [plan_id=1924]
+- Filter isnotnull(order_id#839L)
+- BatchScan
spark_catalog.default.online_orders_ndv[order_id#839L]
spark_catalog.default.online_orders_ndv (branch=null) [filters=order_id IS NOT
NULL, groupedBy=, pushedLimit=None] RuntimeFilters: []
```
## Test 3 - with CBO and NDV stats collected
I collected stats for all the tables as this
```
df=spark.sql(f"""
CALL system.compute_table_stats(
table => '{TABLE_NAMES['total_returns']}'
)
"""
)
df=spark.sql(f"""
CALL system.compute_table_stats(
table => '{TABLE_NAMES['online_orders']}'
)
"""
)
df=spark.sql(f"""
CALL system.compute_table_stats(
table => '{TABLE_NAMES['locations']}'
)
"""
)
```
Then re-attempted the join:
```
spark.sparkContext.setLogLevel("DEBUG")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.iceberg.report-column-stats", "true")
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
df=spark.sql(f"""
SELECT o.order_id, r.customer, l.store_addr
FROM {TABLE_NAMES['online_orders']} o
JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
""")
```
In the logs I see:
```
25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for
order_id#150L
25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for
order_id#151L
25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for
customer#152
25/06/20 10:02:56 DEBUG FilterEstimation: [CBO] No statistics for
customer#153
```
and the query was executed in the same way as on Test 2 (with a non optimal
join order). Seemed NDVs were either not used or did not help CBO.
Note: Inspecting DEBUG log files I do not see any GET call made to the .stat
file in S3.
## Test 4 - Parquet tables (no iceberg) + CBO + NDVs
To confirm itโs an issue with Iceberg NDVs and not with CBO I recreated the
same tables with PARQUET
```
# Define table names and locations
S3_BASE = "s3://mybucket/datasets/temp"
TABLE_NAMES = {
'online_orders': 'default.online_orders_parquet',
'total_returns': 'default.total_returns_parquet',
'locations': 'default.locations_parquet'
}
TABLE_LOCATIONS = {
'online_orders': f"{S3_BASE}/online_orders_parquet",
'total_returns': f"{S3_BASE}/total_returns_parquet",
'locations': f"{S3_BASE}/locations_parquet"
}
# Create tables and insert data using Spark SQL
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAMES['online_orders']} (
order_id BIGINT
) USING parquet
LOCATION '{TABLE_LOCATIONS['online_orders']}'
""")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAMES['total_returns']} (
order_id BIGINT,
customer STRING
) USING parquet
LOCATION '{TABLE_LOCATIONS['total_returns']}'
""")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAMES['locations']} (
customer STRING,
store_addr STRING
) USING parquet
LOCATION '{TABLE_LOCATIONS['locations']}'
""")
# Insert data into online_orders (100M rows)
spark.sql(f"""
INSERT INTO {TABLE_NAMES['online_orders']}
SELECT id as order_id
FROM (
SELECT explode(sequence(1, 100000000)) as id
)
""")
# Insert data into total_returns (2M rows)
# 20K customers with 100 returns each
spark.sql(f"""
WITH customer_base AS (
SELECT explode(sequence(1, 20000)) as customer_id
),
returns_base AS (
SELECT
customer_id,
explode(sequence(1, 100)) as return_seq
FROM customer_base
)
INSERT INTO {TABLE_NAMES['total_returns']}
SELECT
CASE
WHEN rand() < 0.5 THEN CAST(rand() * 100000000 as BIGINT)
ELSE -1
END as order_id,
concat('CUST_', customer_id) as customer
FROM returns_base
""")
# Insert data into locations (1M rows)
# 20K customers with 50 locations each
spark.sql(f"""
WITH customer_base AS (
SELECT explode(sequence(1, 20000)) as customer_id
),
location_base AS (
SELECT
customer_id,
explode(sequence(1, 50)) as store_seq
FROM customer_base
)
INSERT INTO {TABLE_NAMES['locations']}
SELECT
concat('CUST_', customer_id) as customer,
concat('STORE_', customer_id, '_', store_seq) as store_addr
FROM location_base
""")
```
Then I computed column stats at table level (only row numners and table
sizes, no NDVs):
```
spark.sql(f"""
ANALYZE TABLE {TABLE_NAMES['online_orders']} COMPUTE STATISTICS
"""
).show(truncate=False)
spark.sql(f"""
ANALYZE TABLE {TABLE_NAMES['total_returns']} COMPUTE STATISTICS
"""
).show(truncate=False)
spark.sql(f"""
ANALYZE TABLE {TABLE_NAMES['locations']} COMPUTE STATISTICS
"""
).show(truncate=False)
```
We see the collected stats for the 3 tables:
```
spark.sql(f"""
DESCRIBE EXTENDED {TABLE_NAMES['online_orders']}
"""
).show(truncate=False)
+----------------------------+----------------------------------------------------------------+-------+
|col_name |data_type
|comment|
+----------------------------+----------------------------------------------------------------+-------+
|order_id |bigint
|NULL |
| |
| |
|# Detailed Table Information|
| |
|Catalog |spark_catalog
| |
|Database |default
| |
|Table |online_orders_parquet
| |
|Owner |hadoop
| |
|Created Time |Tue Jun 24 07:59:58 GMT 2025
| |
|Last Access |UNKNOWN
| |
|Created By |Spark 3.5.4-amzn-0
| |
|Type |EXTERNAL
| |
|Provider |parquet
| |
|Statistics |400620383 bytes, 100000000 rows
| |
|Location
|s3://mybucket/datasets/temp/online_orders_parquet| |
|Serde Library
|org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | |
|InputFormat
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | |
|OutputFormat
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | |
+----------------------------+----------------------------------------------------------------+-------+
spark.sql(f"""
DESCRIBE EXTENDED {TABLE_NAMES['total_returns']}
"""
).show(truncate=False)
+----------------------------+----------------------------------------------------------------+-------+
|col_name |data_type
|comment|
+----------------------------+----------------------------------------------------------------+-------+
|order_id |bigint
|NULL |
|customer |string
|NULL |
| |
| |
|# Detailed Table Information|
| |
|Catalog |spark_catalog
| |
|Database |default
| |
|Table |total_returns_parquet
| |
|Owner |hadoop
| |
|Created Time |Tue Jun 24 07:59:59 GMT 2025
| |
|Last Access |UNKNOWN
| |
|Created By |Spark 3.5.4-amzn-0
| |
|Type |EXTERNAL
| |
|Provider |parquet
| |
|Statistics |7523202 bytes, 2000000 rows
| |
|Location
|s3://mybucket/datasets/temp/total_returns_parquet| |
|Serde Library
|org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | |
|InputFormat
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | |
|OutputFormat
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat | |
+----------------------------+----------------------------------------------------------------+-------+
spark.sql(f"""
DESCRIBE EXTENDED {TABLE_NAMES['locations']}
"""
).show(truncate=False)
+----------------------------+--------------------------------------------------------------+-------+
|col_name |data_type
|comment|
+----------------------------+--------------------------------------------------------------+-------+
|customer |string
|NULL |
|store_addr |string
|NULL |
| |
| |
|# Detailed Table Information|
| |
|Catalog |spark_catalog
| |
|Database |default
| |
|Table |locations_parquet
| |
|Owner |hadoop
| |
|Created Time |Tue Jun 24 08:00:01 GMT 2025
| |
|Last Access |UNKNOWN
| |
|Created By |Spark 3.5.4-amzn-0
| |
|Type |EXTERNAL
| |
|Provider |parquet
| |
|Statistics |4408847 bytes, 1000000 rows
| |
|Location |s3://mybucket/datasets/temp/locations_parquet
| |
|Serde Library
|org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe | |
|InputFormat
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat | |
|OutputFormat
|org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat| |
+----------------------------+--------------------------------------------------------------+-------+
```
Re-attempted the query
```
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df=spark.sql(f"""
SELECT o.order_id, r.customer, l.store_addr
FROM {TABLE_NAMES['online_orders']} o
JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
""")
df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
```
As for Test 2, CBO did reorder based on table sizes (wrongly/non optimal):
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test,
false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id,
customer, store_addr]
+- WriteFiles
+- *(9) Project [order_id#259L, customer#261, store_addr#263]
+- *(9) SortMergeJoin [order_id#260L], [order_id#259L], Inner
:- *(7) Sort [order_id#260L ASC NULLS FIRST], false, 0
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 3
: +- Exchange hashpartitioning(order_id#260L, 1000),
ENSURE_REQUIREMENTS, [plan_id=1905]
: +- *(6) Project [order_id#260L, customer#261,
store_addr#263]
: +- *(6) SortMergeJoin [customer#261],
[customer#262], Inner
: :- *(4) Sort [customer#261 ASC NULLS FIRST],
false, 0
: : +- AQEShuffleRead coalesced
: : +- ShuffleQueryStage 0
: : +- Exchange
hashpartitioning(customer#261, 1000), ENSURE_REQUIREMENTS, [plan_id=1726]
: : +- *(1) Filter
(isnotnull(order_id#260L) AND isnotnull(customer#261))
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
spark_catalog.default.total_returns_parquet[order_id#260L,customer#261]
Batched: true, DataFilters: [isnotnull(order_id#260L),
isnotnull(customer#261)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[s3://mybucket/datasets/temp/total_returns_parquet], PartitionFilters:
[], PushedFilters: [IsNotNull(order_id), IsNotNull(customer)], ReadSchema:
struct<order_id:bigint,customer:string>
: +- *(5) Sort [customer#262 ASC NULLS FIRST],
false, 0
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 1
: +- Exchange
hashpartitioning(customer#262, 1000), ENSURE_REQUIREMENTS, [plan_id=1743]
: +- *(2) Filter
isnotnull(customer#262)
: +- *(2) ColumnarToRow
: +- FileScan parquet
spark_catalog.default.locations_parquet[customer#262,store_addr#263] Batched:
true, DataFilters: [isnotnull(customer#262)], Format: Parquet, Location:
InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/locations_parquet],
PartitionFilters: [], PushedFilters: [IsNotNull(customer)], ReadSchema:
struct<customer:string,store_addr:string>
+- *(8) Sort [order_id#259L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- Exchange hashpartitioning(order_id#259L, 1000),
ENSURE_REQUIREMENTS, [plan_id=1764]
+- *(3) Filter isnotnull(order_id#259L)
+- *(3) ColumnarToRow
+- FileScan parquet
spark_catalog.default.online_orders_parquet[order_id#259L] Batched: true,
DataFilters: [isnotnull(order_id#259L)], Format: Parquet, Location:
InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/online_orders_parquet],
PartitionFilters: [], PushedFilters: [IsNotNull(order_id)], ReadSchema:
struct<order_id:bigint>
```
Then I computed column stats (including NDVs):
```
spark.sql(f"""
ANALYZE TABLE {TABLE_NAMES['online_orders']} COMPUTE STATISTICS FOR ALL
COLUMNS
"""
).show(truncate=False)
spark.sql(f"""
ANALYZE TABLE {TABLE_NAMES['total_returns']} COMPUTE STATISTICS FOR ALL
COLUMNS
"""
).show(truncate=False)
spark.sql(f"""
ANALYZE TABLE {TABLE_NAMES['locations']} COMPUTE STATISTICS FOR ALL COLUMNS
"""
).show(truncate=False)
```
We can see the columns computed stats:
```
spark.sql(f"""
DESCRIBE EXTENDED {TABLE_NAMES['online_orders']} order_id
"""
).show(truncate=False)
+--------------+----------+
|info_name |info_value|
+--------------+----------+
|col_name |order_id |
|data_type |bigint |
|comment |NULL |
|min |1 |
|max |100000000 |
|num_nulls |0 |
|distinct_count|100000000 |
|avg_col_len |8 |
|max_col_len |8 |
|histogram |NULL |
+--------------+----------+
spark.sql(f"""
DESCRIBE EXTENDED {TABLE_NAMES['total_returns']} order_id
"""
).show(truncate=False)
+--------------+----------+
|info_name |info_value|
+--------------+----------+
|col_name |order_id |
|data_type |bigint |
|comment |NULL |
|min |-1 |
|max |99999948 |
|num_nulls |0 |
|distinct_count|1063157 |
|avg_col_len |8 |
|max_col_len |8 |
|histogram |NULL |
+--------------+----------+
spark.sql(f"""
DESCRIBE EXTENDED {TABLE_NAMES['total_returns']} customer
"""
).show(truncate=False)
+--------------+----------+
|info_name |info_value|
+--------------+----------+
|col_name |customer |
|data_type |string |
|comment |NULL |
|min |NULL |
|max |NULL |
|num_nulls |0 |
|distinct_count|20274 |
|avg_col_len |10 |
|max_col_len |10 |
|histogram |NULL |
+--------------+----------+
spark.sql(f"""
DESCRIBE EXTENDED {TABLE_NAMES['locations']} customer
"""
).show(truncate=False)
+--------------+----------+
|info_name |info_value|
+--------------+----------+
|col_name |customer |
|data_type |string |
|comment |NULL |
|min |NULL |
|max |NULL |
|num_nulls |0 |
|distinct_count|20274 |
|avg_col_len |10 |
|max_col_len |10 |
|histogram |NULL |
+--------------+----------+
spark.sql(f"""
DESCRIBE EXTENDED {TABLE_NAMES['locations']} store_addr
"""
).show(truncate=False)
+--------------+----------+
|info_name |info_value|
+--------------+----------+
|col_name |store_addr|
|data_type |string |
|comment |NULL |
|min |NULL |
|max |NULL |
|num_nulls |0 |
|distinct_count|1000000 |
|avg_col_len |14 |
|max_col_len |14 |
|histogram |NULL |
+--------------+----------+
```
Re-attempted the query:
```
spark.conf.set("spark.sql.cbo.enabled", "true")
spark.conf.set("spark.sql.cbo.joinReorder.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df=spark.sql(f"""
SELECT o.order_id, r.customer, l.store_addr
FROM {TABLE_NAMES['online_orders']} o
JOIN {TABLE_NAMES['total_returns']} r ON o.order_id = r.order_id
JOIN {TABLE_NAMES['locations']} l ON r.customer = l.customer
""")
df.write.mode("overwrite").parquet("s3://mybucket/demo2/ndv_test/")
```
We can see that CBO having NDVs properly reordered the queries:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
Execute InsertIntoHadoopFsRelationCommand s3://mybucket/demo2/ndv_test,
false, Parquet, [path=s3://mybucket/demo2/ndv_test/], Overwrite, [order_id,
customer, store_addr]
+- WriteFiles
+- *(5) Project [order_id#1167L, customer#2914, store_addr#4685]
+- *(5) ShuffledHashJoin [customer#2914], [customer#4684], Inner,
BuildLeft
:- AQEShuffleRead coalesced
: +- ShuffleQueryStage 3
: +- Exchange hashpartitioning(customer#2914, 1000),
ENSURE_REQUIREMENTS, [plan_id=2527]
: +- *(4) Project [order_id#1167L, customer#2914]
: +- *(4) ShuffledHashJoin [order_id#1167L],
[order_id#2913L], Inner, BuildRight
: :- AQEShuffleRead coalesced
: : +- ShuffleQueryStage 0
: : +- Exchange
hashpartitioning(order_id#1167L, 1000), ENSURE_REQUIREMENTS, [plan_id=2306]
: : +- *(1) Filter isnotnull(order_id#1167L)
: : +- *(1) ColumnarToRow
: : +- FileScan parquet
spark_catalog.default.online_orders_parquet[order_id#1167L] Batched: true,
DataFilters: [isnotnull(order_id#1167L)], Format: Parquet, Location:
InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/online_orders_parquet],
PartitionFilters: [], PushedFilters: [IsNotNull(order_id)], ReadSchema:
struct<order_id:bigint>
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 1
: +- Exchange
hashpartitioning(order_id#2913L, 1000), ENSURE_REQUIREMENTS, [plan_id=2323]
: +- *(2) Filter
(isnotnull(order_id#2913L) AND isnotnull(customer#2914))
: +- *(2) ColumnarToRow
: +- FileScan parquet
spark_catalog.default.total_returns_parquet[order_id#2913L,customer#2914]
Batched: true, DataFilters: [isnotnull(order_id#2913L),
isnotnull(customer#2914)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[s3://mybucket/datasets/temp/total_returns_parquet], PartitionFilters:
[], PushedFilters: [IsNotNull(order_id), IsNotNull(customer)], ReadSchema:
struct<order_id:bigint,customer:string>
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- Exchange hashpartitioning(customer#4684, 1000),
ENSURE_REQUIREMENTS, [plan_id=2344]
+- *(3) Filter isnotnull(customer#4684)
+- *(3) ColumnarToRow
+- FileScan parquet
spark_catalog.default.locations_parquet[customer#4684,store_addr#4685] Batched:
true, DataFilters: [isnotnull(customer#4684)], Format: Parquet, Location:
InMemoryFileIndex(1 paths)[s3://mybucket/datasets/temp/locations_parquet],
PartitionFilters: [], PushedFilters: [IsNotNull(customer)], ReadSchema:
struct<customer:string,store_addr:string>
```
### Willingness to contribute
- [ ] I can contribute a fix for this bug independently
- [ ] I would be willing to contribute a fix for this bug with guidance from
the Iceberg community
- [x] I cannot contribute a fix for this bug at this time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]