Shubham-kumar-0712 opened a new issue, #15015:
URL: https://github.com/apache/iceberg/issues/15015
### Environment:
* Platform: AWS EMR 7.8.0
* Spark Version: 3.5.0
* Iceberg Version: 1.9.2 (AWS SDK bundle) / Open Source equivalent ~1.5.x
* Distribution Mode: Hash
**Impact:** Storage Partitioned Join (SPJ) fails completely when joining 3+
bucketed tables, preventing shuffle-free optimizations for complex queries.
**Description:** I am attempting a 3-way join on three Iceberg tables. All
three tables are partitioned with bucket(4000, aspk_id) and contain data
sorted/written with hash distribution.
* 2-Way Join: Works correctly (SPJ is triggered, no shuffle).
* 3-Way Join: Fails with IllegalArgumentException: Can't zip RDDs with
unequal numbers of partitions: List(4000, 8000).
I have confirmed that spark.sql.shuffle.partitions is explicitly set to
4000. The error implies Spark believes one side of the join has 8000
partitions, despite all table metadata and configurations confirming 4000.
### Reproduction Steps:
* Create 3 Iceberg tables (pip_data,home_4000, office_4000) with
bucket(4000, key).
* Insert data using write.distribution-mode=hash.
* Set spark.sql.shuffle.partitions = 4000 and disable AQE.
* Execute a left join chain: A left join B on key left join C on key.
### Observed Error:
```pyspark.errors.exceptions.captured.IllegalArgumentException: Can't zip
RDDs with unequal numbers of partitions: List(4000, 8000)```
### Spark Configuration:
```
.config("spark.sql.shuffle.partitions", "4000")
.config("spark.sql.adaptive.enabled", "false")
.config("spark.sql.adaptive.coalescePartitions.enabled", "false")
.config("spark.sql.sources.v2.bucketing.enabled", "true")
.config("spark.sql.iceberg.planning.preserve-data-grouping", "true")
.config("spark.sql.join.preferSortMergeJoin", "false")
.config("spark.sql.autoBroadcastJoinThreshold", "-1")
```
### Analysis & Attempts:
1. Metadata Verification: I have verified via SHOW CREATE TABLE that all
three tables are strictly INTO 4000 BUCKETS.
2. Cache Clearing: I have run REFRESH TABLE on all inputs to ensure no stale
metadata is cached.
3. Compaction: I have run rewrite_data_files to ensure 1 file per bucket.
(Note: When I previously tested with 8000 buckets, the join only worked after
compaction. For 4000 buckets, even with pristine writes, it fails).
4. Isolation:
* A join B: Success (SPJ Active)
* A join C: Success (SPJ Active)
* A join B join C: Fail
**Question:** Is there a known limitation in the V2 Bucketing / SPJ planning
rule that forces a fallback partition count when the plan complexity increases
beyond a single join, even when `shuffle.partitions` is set to match the bucket
count?
### SparkPlan
```
== Physical Plan ==
*(5) Project [p_id#298, id#299, timestamp#300, tz#301, aspk_id#302L,
visit_date#303, yyyymm_pip#304, timestamp_UTC#305, timestamp_at_timezone#306,
o_country#326, o_state#327, h_country#311, h_state#312]
+- *(5) SortMergeJoin [aspk_id#302L], [aspk_id#335L], LeftOuter
:- *(3) Project [p_id#298, id#299, timestamp#300, tz#301, aspk_id#302L,
visit_date#303, yyyymm_pip#304, timestamp_UTC#305, timestamp_at_timezone#306,
h_country#311, h_state#312]
: +- *(3) SortMergeJoin [aspk_id#302L], [aspk_id#320L], LeftOuter
: :- *(1) Sort [aspk_id#302L ASC NULLS FIRST], false, 0
: : +- *(1) ColumnarToRow
: : +- BatchScan glue_catalog.staging.pip_data[p_id#298, id#299,
timestamp#300, tz#301, aspk_id#302L, visit_date#303, yyyymm_pip#304,
timestamp_UTC#305, timestamp_at_timezone#306] glue_catalog.staging.pip_data
(branch=null) [filters=, groupedBy=aspk_id_bucket] RuntimeFilters: []
: +- *(2) Sort [aspk_id#320L ASC NULLS FIRST], false, 0
: +- *(2) ColumnarToRow
: +- BatchScan glue_catalog.staging.home_4000[h_country#311,
h_state#312, aspk_id#320L] glue_catalog.staging.home_4000 (branch=null)
[filters=aspk_id IS NOT NULL, groupedBy=aspk_id_bucket] RuntimeFilters: []
+- *(4) Sort [aspk_id#335L ASC NULLS FIRST], false, 0
+- *(4) ColumnarToRow
+- BatchScan glue_catalog.staging.office_4000[o_country#326,
o_state#327, aspk_id#335L] glue_catalog.staging.office_4000 (branch=null)
[filters=aspk_id IS NOT NULL, groupedBy=aspk_id_bucket] RuntimeFilters: []
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]