atognolag opened a new pull request, #38406:
URL: https://github.com/apache/beam/pull/38406

   # Pull Request: High-Performance Sorted Writing Support in Native IcebergIO
   
   ## Description
   This pull request implements robust, high-performance sorted writing support 
in native `IcebergIO` (`sdks/java/io/iceberg`).
   
   When writing to sorted Iceberg tables, writing unsorted data causes massive 
performance degradation, high memory overhead, and "file thrashing" due to too 
many concurrent file writers being kept open on workers. By dynamically 
pre-sorting incoming `PCollection<Row>` elements based on the active Iceberg 
table `SortOrder` inside the write transform, we produce perfectly ordered 
Parquet files, optimize worker resources, and reduce the number of concurrent 
file handles.
   
   ---
   
   ## Core Technical Implementation
   
   ### 1. Memory-Safe Spill-to-Disk Sorter (`IcebergRowSorter.java`)
   - Integrates Beam's native `:sdks:java:extensions:sorter` (via 
`BufferedExternalSorter`).
   - Processes row sorting per-partition dynamically. If memory buffers are 
exceeded, the sorter automatically spills elements to the worker's local disk, 
ensuring strict memory guardrails against out-of-memory (`OOM`) crashes.
   
   ### 2. Dynamic Unsigned Lexicographical Byte Encoding
   - Generates a sorting key dynamically by converting composite sort columns 
into a lexicographically comparable byte array (`byte[]`):
     - **Direction (ASC vs DESC)**: ASC columns are encoded in their natural 
comparable format. DESC columns are inverted bitwise (`~byte`) to reverse the 
unsigned byte comparison order naturally.
     - **Null Constraints (`NULLS_FIRST` & `NULLS_LAST`)**: Prefix headers are 
mapped statically to direct standard unsigned comparators correctly:
       - ASC NULLS_FIRST -> `0x00` / ASC NULLS_LAST -> `0xFF`
       - DESC NULLS_FIRST -> `0xFF` / DESC NULLS_LAST -> `0x00`
     - **Escape Boundary Protocol**: Strings and byte arrays are mapped to a 
deterministic, collision-free escaping sequence (`0x00 -> [0x01, 0x01]`, `0x01 
-> [0x01, 0x02]`) terminated by a safe `0x00` byte. This prevents column 
boundary bleeding on composite keys (e.g. `"abc"+"def"` vs `"abcdef"+null`).
     - **Algebraic Numbers**: Transforms signed integers, longs, doubles, and 
floats into big-endian byte structures with flipped sign bits to preserve 
algebraic scale order.
     - **Flexible Timeframes**: Supports `ReadableInstant`, 
`java.time.Instant`, and `java.util.Date` conversions, preventing 
runner-specific casting crashes.
   
   ### 3. Shard-Routing and Write-Path Bypasses
   - **Dynamic Schema Extraction**: Extracts schemas dynamically from active 
row elements at runtime, avoiding null schema references caused by transient 
serialization on worker nodes.
   - **Direct-Write Bypass**: Dynamically inspects target table metadata inside 
`WriteUngroupedRowsToFiles`. If the table has an active `SortOrder`, it skips 
direct ungrouped writing and spills elements to the grouped, shuffled path 
where they are properly partitioned and pre-sorted before writing.
   
   ---
   
   ## Verification and Test Coverage
   
   ### 1. Expanded Unit Tests (`IcebergRowSorterTest.java`)
   - **String Collision Proofing**: Asserts that boundary-safe byte escaping 
correctly distinguishes and orders complex composite strings.
   - **Null Quadrant Matrix**: Validates all 4 null-ordering combinations under 
unsigned comparison.
   - **Scale and Disk Spill**: Asserts sorting safety and data integrity with 
5,000 randomized records under extremely tight memory bounds (1MB buffer), 
forcing disk spilling.
   
   ### 2. End-to-End Pipeline Integration Tests (`IcebergIOWriteTest.java`)
   - Appends scrambled datasets to a sorted Iceberg table in a Beam pipeline.
   - Implements a sharding-friendly `assertFilesAreInternallySorted` 
verification helper which parses individual Parquet files committed to the 
table directly using Iceberg scan APIs, ensuring that *each written file* is 
perfectly sorted internally, regardless of the runner's sharding factor.
   - Tested and validated successfully across `NONE`, `HASH`, and 
`HASH_WITH_AUTOSHARDING` distribution modes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to