ayushk7102 opened a new issue, #3129:
URL: https://github.com/apache/iceberg-python/issues/3129

   ### Apache Iceberg version
   
   None
   
   ### Please describe the bug 🐞
   
   CC [@goutamvenkat-anyscale](https://github.com/goutamvenkat-anyscale)
   
   Hello! We are implementing distributed writes from Ray Data to Iceberg. As 
part of upserts, we:
   
   1. Write data files in parallel across Ray workers (each worker writes its 
share of Parquet files directly to storage and returns `DataFile` metadata + 
the upsert key columns back to the driver)
   2. On the driver, concatenate all upsert keys collected from workers, call 
`create_match_filter` to build a delete predicate, then call `txn.delete()` 
followed by an append to commit
   
   Upserting 1M rows (383 MiB) into an Iceberg table takes **~17.5 minutes**, 
almost entirely in the delete step:
   
   ```
   create_match_filter (1M keys → In filter):   10.26s
   txn.delete():                              1054.35s
   append + commit:                              1.14s
   ─────────────────────────────────────────────────────
   Total upsert commit:                       1065.75s
   ```
   
   PyIceberg version `0.11.0`
   
   This matches what's reported in #2159 and #2138.
   
   The bottlenecks are:
   1. **`create_match_filter`** — constructs a Python `BooleanExpression` node 
per row, which is expensive at 1M+ keys
   2. **`txn.delete()`** — evaluates the resulting giant `In` expression 
against the table's data files with no partition pruning, effectively doing a 
full table scan
   
   We have a few questions:
   
   1. **Merge-on-read upserts** — is this on the roadmap, and if so, roughly 
when? MoR would let us avoid the expensive delete + rewrite cycle entirely for 
large upserts.
   2. **Optimizing `create_match_filter` or `txn.delete()`** — is there a 
recommended way to speed these up today? For example, batching the `In` filter, 
or passing a partition-level hint to constrain the file scan?
   3. **Partition-aware deletes** — if the upsert key columns overlap with 
partition columns, is there a supported way to restrict `txn.delete()` to only 
the relevant partitions, rather than scanning the full table?
   
   ## Related
   
   - #2159 — Upserting large table extremely slow
   - #2138 — Upsertion memory usage grows exponentially as table size grows
   - #2943 — Optimize upsert performance for large datasets
   
   
   ### Willingness to contribute
   
   - [ ] I can contribute a fix for this bug independently
   - [ ] I would be willing to contribute a fix for this bug with guidance from 
the Iceberg community
   - [ ] I cannot contribute a fix for this bug at this time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to