qzyu999 commented on issue #1092:
URL: 
https://github.com/apache/iceberg-python/issues/1092#issuecomment-3988656493

   Hi @kevinjqliu, thanks for the guidance. I've taken a look at the Java/Spark 
code (referencing v3.5), and I can see that there are quite a few options and 
nuances to the existing Java implementation. I believe for this issue, the goal 
is to focus more on some sort of MVP as described by @sungwy. Checking with the 
existing codebase from the latest iceberg-python main branch, I went ahead and 
checked what we have and what is missing for a potential MVP.
   
   We already have major components such as:
   - `table.scan(filter).plan_files()` (in `pyiceberg/table/__init__.py` for 
filtering based on a predicate as described in the OP by @sungwy)
   - Expression types (in `pyiceberg/expressions`)
   - `ListPacker` (in `pyiceberg/utils/bin_packing.py` for bin-packing the 
files together into manageable sizes)
   - various `pyarrow` functions to handle a single-node/python-only MVP setup 
(located in `pyiceberg/io/pyarrow.py`)
   - _OverwriteFiles for the commit (in `pyiceberg/table/update/snapshot.py`)
   - MaintenanceTable class (in `pyiceberg/table/maintenance.py`) where we can 
add a `rewrite_data_files(table, row_filter)` function
   
   I've tested some code locally, and I'm able to do the following:
   1. generate a partitioned table with many small files added across multiple 
snapshots
   2. then scan the table given a predicate filter to find all the needed data 
files
     2a. This also is able to handle the edge case (as seen in 
`groupByPartition()` from `BinPackRewriteFilePlanner.java`) where old partition 
specs after schema evolution changes needs to manage where old data files 
potentially from previous partitions need to be handled as a separate partition 
group to be rewritten specifically.
   3. process each partition (similar to `planFileGroups` in 
`BinPackRewriteFilePlanner.java`) using constant vars seen in the Java version 
of `SizeBasedFileRewritePlanner` - this effectively creates the list of all 
records across partitions that need to be rewritten AND it works in the case of 
selecting a specific set of rows using `row_filter` rather than automatically 
rewriting everything
     3a. filter files by size
     3b. bin-pack the files
     3c. filter groups (like in `filterFileGroups` from 
`BinPackRewriteFilePlanner.java`)
   4. create a list of both the old data files and the new data files (based on 
the bin-packing etc.)
   5. create a transaction to 1) delete the old files and 2) append the new 
files in a single commit
   
   There's one major nuance for this first MVP, which is that there's a 
`expectedOutputFiles()` in `SizeBasedFileRewritePlanner.java`, where it has an 
algorithm to handle the "remainder" problem (e.g., we may potentially write 10 
large files and 1 small file rather than 10 files where 1 file is slightly 
larger than in the former case). PyIceberg apparently utilizes a 
`bin_pack_arrow_table()` within `iceberg-python/pyiceberg/io/pyarrow.py`, which 
is doing more of a real-time optimization rather than a planned optimization 
that potentially is more optimal. However, I feel that for this initial MVP 
it's not needed.
   
   What are your thoughts, is it okay to proceed and create a PR for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to