atognolag opened a new pull request, #38406:
URL: https://github.com/apache/beam/pull/38406
# Pull Request: High-Performance Sorted Writing Support in Native IcebergIO
## Description
This pull request implements robust, high-performance sorted writing support
in native `IcebergIO` (`sdks/java/io/iceberg`).
When writing to sorted Iceberg tables, writing unsorted data causes massive
performance degradation, high memory overhead, and "file thrashing" due to too
many concurrent file writers being kept open on workers. By dynamically
pre-sorting incoming `PCollection<Row>` elements based on the active Iceberg
table `SortOrder` inside the write transform, we produce perfectly ordered
Parquet files, optimize worker resources, and reduce the number of concurrent
file handles.
---
## Core Technical Implementation
### 1. Memory-Safe Spill-to-Disk Sorter (`IcebergRowSorter.java`)
- Integrates Beam's native `:sdks:java:extensions:sorter` (via
`BufferedExternalSorter`).
- Processes row sorting per-partition dynamically. If memory buffers are
exceeded, the sorter automatically spills elements to the worker's local disk,
ensuring strict memory guardrails against out-of-memory (`OOM`) crashes.
### 2. Dynamic Unsigned Lexicographical Byte Encoding
- Generates a sorting key dynamically by converting composite sort columns
into a lexicographically comparable byte array (`byte[]`):
- **Direction (ASC vs DESC)**: ASC columns are encoded in their natural
comparable format. DESC columns are inverted bitwise (`~byte`) to reverse the
unsigned byte comparison order naturally.
- **Null Constraints (`NULLS_FIRST` & `NULLS_LAST`)**: Prefix headers are
mapped statically to direct standard unsigned comparators correctly:
- ASC NULLS_FIRST -> `0x00` / ASC NULLS_LAST -> `0xFF`
- DESC NULLS_FIRST -> `0xFF` / DESC NULLS_LAST -> `0x00`
- **Escape Boundary Protocol**: Strings and byte arrays are mapped to a
deterministic, collision-free escaping sequence (`0x00 -> [0x01, 0x01]`, `0x01
-> [0x01, 0x02]`) terminated by a safe `0x00` byte. This prevents column
boundary bleeding on composite keys (e.g. `"abc"+"def"` vs `"abcdef"+null`).
- **Algebraic Numbers**: Transforms signed integers, longs, doubles, and
floats into big-endian byte structures with flipped sign bits to preserve
algebraic scale order.
- **Flexible Timeframes**: Supports `ReadableInstant`,
`java.time.Instant`, and `java.util.Date` conversions, preventing
runner-specific casting crashes.
### 3. Shard-Routing and Write-Path Bypasses
- **Dynamic Schema Extraction**: Extracts schemas dynamically from active
row elements at runtime, avoiding null schema references caused by transient
serialization on worker nodes.
- **Direct-Write Bypass**: Dynamically inspects target table metadata inside
`WriteUngroupedRowsToFiles`. If the table has an active `SortOrder`, it skips
direct ungrouped writing and spills elements to the grouped, shuffled path
where they are properly partitioned and pre-sorted before writing.
---
## Verification and Test Coverage
### 1. Expanded Unit Tests (`IcebergRowSorterTest.java`)
- **String Collision Proofing**: Asserts that boundary-safe byte escaping
correctly distinguishes and orders complex composite strings.
- **Null Quadrant Matrix**: Validates all 4 null-ordering combinations under
unsigned comparison.
- **Scale and Disk Spill**: Asserts sorting safety and data integrity with
5,000 randomized records under extremely tight memory bounds (1MB buffer),
forcing disk spilling.
### 2. End-to-End Pipeline Integration Tests (`IcebergIOWriteTest.java`)
- Appends scrambled datasets to a sorted Iceberg table in a Beam pipeline.
- Implements a sharding-friendly `assertFilesAreInternallySorted`
verification helper which parses individual Parquet files committed to the
table directly using Iceberg scan APIs, ensuring that *each written file* is
perfectly sorted internally, regardless of the runner's sharding factor.
- Tested and validated successfully across `NONE`, `HASH`, and
`HASH_WITH_AUTOSHARDING` distribution modes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]