qzyu999 commented on issue #1092:
URL:
https://github.com/apache/iceberg-python/issues/1092#issuecomment-3988656493
Hi @kevinjqliu, thanks for the guidance. I've taken a look at the Java/Spark
code (referencing v3.5), and I can see that there are quite a few options and
nuances to the existing Java implementation. I believe for this issue, the goal
is to focus more on some sort of MVP as described by @sungwy. Checking with the
existing codebase from the latest iceberg-python main branch, I went ahead and
checked what we have and what is missing for a potential MVP.
We already have major components such as:
- `table.scan(filter).plan_files()` (in `pyiceberg/table/__init__.py` for
filtering based on a predicate as described in the OP by @sungwy)
- Expression types (in `pyiceberg/expressions`)
- `ListPacker` (in `pyiceberg/utils/bin_packing.py` for bin-packing the
files together into manageable sizes)
- various `pyarrow` functions to handle a single-node/python-only MVP setup
(located in `pyiceberg/io/pyarrow.py`)
- _OverwriteFiles for the commit (in `pyiceberg/table/update/snapshot.py`)
- MaintenanceTable class (in `pyiceberg/table/maintenance.py`) where we can
add a `rewrite_data_files(table, row_filter)` function
I've tested some code locally, and I'm able to do the following:
1. generate a partitioned table with many small files added across multiple
snapshots
2. then scan the table given a predicate filter to find all the needed data
files
2a. This also is able to handle the edge case (as seen in
`groupByPartition()` from `BinPackRewriteFilePlanner.java`) where old partition
specs after schema evolution changes needs to manage where old data files
potentially from previous partitions need to be handled as a separate partition
group to be rewritten specifically.
3. process each partition (similar to `planFileGroups` in
`BinPackRewriteFilePlanner.java`) using constant vars seen in the Java version
of `SizeBasedFileRewritePlanner` - this effectively creates the list of all
records across partitions that need to be rewritten AND it works in the case of
selecting a specific set of rows using `row_filter` rather than automatically
rewriting everything
3a. filter files by size
3b. bin-pack the files
3c. filter groups (like in `filterFileGroups` from
`BinPackRewriteFilePlanner.java`)
4. create a list of both the old data files and the new data files (based on
the bin-packing etc.)
5. create a transaction to 1) delete the old files and 2) append the new
files in a single commit
There's one major nuance for this first MVP, which is that there's a
`expectedOutputFiles()` in `SizeBasedFileRewritePlanner.java`, where it has an
algorithm to handle the "remainder" problem (e.g., we may potentially write 10
large files and 1 small file rather than 10 files where 1 file is slightly
larger than in the former case). PyIceberg apparently utilizes a
`bin_pack_arrow_table()` within `iceberg-python/pyiceberg/io/pyarrow.py`, which
is doing more of a real-time optimization rather than a planned optimization
that potentially is more optimal. However, I feel that for this initial MVP
it's not needed.
What are your thoughts, is it okay to proceed and create a PR for this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]