atognolag commented on PR #38406:
URL: https://github.com/apache/beam/pull/38406#issuecomment-4406233033

   I have updated this PR to address your (Ahmed) feedback and optimize the 
sorting implementation. Below is a summary of the changes:
   
   **1. Configurable Distribution Modes**
   Introduced configurable Distribution Modes in IcebergIO.WriteRows to manage 
sharding and sorting behaviors before writing:
   
   1. HASH (Default): Shuffles and groups data by partition key ({destination, 
partition}) to consolidate files per partition and prevent cross-worker file 
overlaps. Supports optional dynamic auto-sharding (withAutosharding()) to split 
hot partitions (but losing the non-overlapping files condition).
   
   2. RANGE: Shuffles data based on a user-supplied SerializableFunction<Row, 
Integer> (shard function) to distribute hot partitions across parallel sharded 
workers, keeping file min/max key ranges non-overlapping (provided the user 
logic does not randomize shard IDs).
   
   3. NONE: No network shuffle. Best for lightweight pipelines that rely on 
post-fact compaction or read-time sort merges.
   
   **2. Performance Optimizations (GC & CPU Overhead Reductions)**
   Optimized the sorting pipeline in IcebergRowSorter to reduce CPU cycles and 
GC memory pressure:
   
   1. Field-Level Value Conversion: Added beamValueToIcebergValue to 
IcebergUtils. This converts only the specific fields requiring a sort 
transform, completely bypassing the expensive row-to-record conversion 
(beamRowToIcebergRecord) inside the sorting loop.
   2. Stream Reuse: Reuses and resets (.reset()) the ByteArrayOutputStream in 
the sorting loops to eliminate short-lived output stream allocations.
   3. Primitive Serialization: Encodes numeric fields using bitwise shifts 
directly into the output stream, eliminating intermediate ByteBuffer and byte[] 
allocations.
   
   **3. Sorter Correctness & Safety**
   1. Null-Ordering Correctness: Corrected the null-ordering prefix byte logic 
for descending sort orders. Prefix bytes now remain consistent regardless of 
sorting direction (which is already handled by bitwise inverting values), 
resolving the descending null order bugs.
   2. Sorting Type Safety: Throws UnsupportedOperationException when attempting 
to sort on complex types (maps, lists, structs) to prevent non Iceberg 
compliant sorts.
   3. Deprecated API Removal: Removed the unused, deprecated encodeSortKey 
overload from IcebergRowSorter since it is a package-private utility class and 
not used elsewhere.
   
   **4. Verification & Test Coverage**
   1. Checkstyle & Linter Warnings: Documented all exposed configurations and 
resolved Google Error Prone warnings.
   2. New Test Cases: Added testRangeDistribution to IcebergIOWriteTest to 
verify range-based writes with custom sharding. Added a test verifying 
ValidationException is thrown when trying to construct a sort order with 
complex types.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to