anuragmantri opened a new pull request, #16305: URL: https://github.com/apache/iceberg/pull/16305
This PR adds a K-way merge compaction strategy to the RewriteDataFiles
action. K-way merge rewrites pre-sorted data files by streaming-merging them in
sort-key order without shuffle, preserving the table's sort order in output
files.
K-way merge is intended for tables that are already sorted but have
accumulated multiple overlapping files per partition (e.g., after daily
ingestion into a previously sort-compacted table). It achieves the same output
as SORT but eliminates shuffle and spill entirely.
**Implementation summary:**
1. API: New kWayMerge() default method on RewriteDataFiles interface. The
table must have a defined sort order.
2. Planner (KWayMergeRewriteFilePlanner): Extends
SizeBasedFileRewritePlanner with sort-key-ordered file grouping. Files are
sorted by their lower bounds on the first sort field before bin-packing,
ensuring groups cover contiguous key ranges. Includes includeColumnStats() on
the table scan to load file bounds for planning.
3. Runner (SparkKWayMergeFileRewriteRunner): Opens all files in a group as
streaming iterators, applies GenericDeleteFilter for position/equality
deletes, then merges using SortedMerge (priority queue). Output is written via
GenericAppenderFactory with size-based file rotation and partition-change
detection.
4. Range parallelism: For large groups, the runner splits files into ranges
by total size and processes ranges in parallel via jsc.parallelize(). Range
assignments are broadcast to executors. Controlled by range-parallelism-enabled
(default true), ranges-per-group (default 25), and
min-files-for-range-parallelism (default 10).
**Constraints:**
1. All input files must have a valid sort_order_id > 0 matching the table's
sort order. Files without sort metadata are rejected with a clear error.
2. Uses Iceberg's generic reader/writer stack (row-by-row), not Spark's
vectorized path. This means higher per-record overhead than sort, but zero
shuffle I/O.
3. Output files are individually sorted. When range parallelism is enabled,
files from different ranges may have overlapping key ranges (consistent with
how SORT behaves).
**Usage:**
```sql
-- Via procedure
CALL catalog.system.rewrite_data_files(
table => 'db.my_table',
strategy => 'k-way-merge',
options => map('max-concurrent-file-group-rewrites', '100')
)
```
```sql
// Via action API
SparkActions.get()
.rewriteDataFiles(table)
.kWayMerge()
.option("max-concurrent-file-group-rewrites", "100")
.execute();
```
AI Usage: I used Claude Opus 4.7 for code generation, test writing, and
review. I manually reviewed and validated all generated code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
