1raghavmahajan opened a new issue, #14249: URL: https://github.com/apache/iceberg/issues/14249
### Feature Request / Improvement ## Problem When using the `create_changelog_view` procedure with `net_changes=true` to remove intermediate changes across multiple snapshots, the current implementation is computationally expensive. The operation performs a **repartition and sort on ALL columns** in the table, which becomes a significant bottleneck for wide/large tables. See https://github.com/apache/iceberg/blob/36bb82675ff68ac0ed059d4db62550d30aa35760/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java#L220-L227 This isn't true for the `compute_updates` path wherein the repartition is only performed on the `identifier fields` + `change_ordinal`. ### Current Limitation The `net_changes=true` path does not support identifier columns: - If the table has identifier columns defined in the schema, they are ignored when net_changes=true is used - If identifier columns are explicitly provided as a parameter along with net_changes=true, the procedure fails This forces users to choose between: 1. Getting net changes (expensive all-column repartition) 2. Using identifier columns for optimization (but only getting per-snapshot updates) ## Proposed Solution ### Expected Behavior Allow `net_changes=true` to work with identifier columns: | net_changes | identifier_columns | compute_updates | Behavior | | ----------- | ------------------ | --------------- | ----------------------------------------------------------------------------------------- | | false | not provided | false | Remove carryovers only (existing) | | false | provided | true (auto) | Per-snapshot pre/post images (existing) | | true | not provided | false | Net changes via all-column repartition (existing) | | true | provided | true | Net changes with pre/post images across the entire snapshot range (NEW - currently fails) | ### Implementation Details We implement ComputeNetUpdates which combines `net_changes=true` with identifier columns: 1. Repartition by `identifier_columns` and sort within partition by `identifier_columns + change_ordinal` 2. Use window functions to identify first and last changes for each logical row 3. Filter to keep only first and last changes (as per `change_ordinal`) for each logical row 4. Apply `RemoveCarryoverRows` iterator on this reduced dataset 5. Calculate pre/post images using DELETE-INSERT pairs (similar to existing ComputeUpdateIterator). This mirrors the existing `compute_updates` approach ([see](https://github.com/apache/iceberg/blob/36bb82675ff68ac0ed059d4db62550d30aa35760/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java#L182)), but operates across the entire snapshot range (similar to `net_changes`) rather than per-snapshot. ### Performance Impact - Reduced shuffle data volume: Only identifier columns are used for partitioning (typically 1-3 columns) instead of all columns (potentially dozens) - Reduced sort key size: Sorting on fewer columns is exponentially faster - Less data to process: Window function pre-filters data before the final iterator logic This is particularly helpful for Tables with: - Wide schemas (20+ columns) - High cardinality data - Multiple snapshots with many intermediate changes ### Query engine Spark ### Willingness to contribute - [x] I can contribute this improvement/feature independently - [ ] I would be willing to contribute this improvement/feature with guidance from the Iceberg community - [ ] I cannot contribute this improvement/feature at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
