atognolag commented on PR #38406:
URL: https://github.com/apache/beam/pull/38406#issuecomment-4406233033
I have updated this PR to address your (Ahmed) feedback and optimize the
sorting implementation. Below is a summary of the changes:
**1. Configurable Distribution Modes**
Introduced configurable Distribution Modes in IcebergIO.WriteRows to manage
sharding and sorting behaviors before writing:
1. HASH (Default): Shuffles and groups data by partition key ({destination,
partition}) to consolidate files per partition and prevent cross-worker file
overlaps. Supports optional dynamic auto-sharding (withAutosharding()) to split
hot partitions (but losing the non-overlapping files condition).
2. RANGE: Shuffles data based on a user-supplied SerializableFunction<Row,
Integer> (shard function) to distribute hot partitions across parallel sharded
workers, keeping file min/max key ranges non-overlapping (provided the user
logic does not randomize shard IDs).
3. NONE: No network shuffle. Best for lightweight pipelines that rely on
post-fact compaction or read-time sort merges.
**2. Performance Optimizations (GC & CPU Overhead Reductions)**
Optimized the sorting pipeline in IcebergRowSorter to reduce CPU cycles and
GC memory pressure:
1. Field-Level Value Conversion: Added beamValueToIcebergValue to
IcebergUtils. This converts only the specific fields requiring a sort
transform, completely bypassing the expensive row-to-record conversion
(beamRowToIcebergRecord) inside the sorting loop.
2. Stream Reuse: Reuses and resets (.reset()) the ByteArrayOutputStream in
the sorting loops to eliminate short-lived output stream allocations.
3. Primitive Serialization: Encodes numeric fields using bitwise shifts
directly into the output stream, eliminating intermediate ByteBuffer and byte[]
allocations.
**3. Sorter Correctness & Safety**
1. Null-Ordering Correctness: Corrected the null-ordering prefix byte logic
for descending sort orders. Prefix bytes now remain consistent regardless of
sorting direction (which is already handled by bitwise inverting values),
resolving the descending null order bugs.
2. Sorting Type Safety: Throws UnsupportedOperationException when attempting
to sort on complex types (maps, lists, structs) to prevent non Iceberg
compliant sorts.
3. Deprecated API Removal: Removed the unused, deprecated encodeSortKey
overload from IcebergRowSorter since it is a package-private utility class and
not used elsewhere.
**4. Verification & Test Coverage**
1. Checkstyle & Linter Warnings: Documented all exposed configurations and
resolved Google Error Prone warnings.
2. New Test Cases: Added testRangeDistribution to IcebergIOWriteTest to
verify range-based writes with custom sharding. Added a test verifying
ValidationException is thrown when trying to construct a sort order with
complex types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]