jqin61 commented on issue #208:
URL: https://github.com/apache/iceberg-python/issues/208#issuecomment-1901345425
Based on the existing discussion, there are 3 major possible directions for
detecting partitions and writing each partition in a multi-threaded way to
maximize I/O. It seems there isn’t any approach simple enough that we could
purely leverage the existing Pyarrow APIs in Pyiceberg. I list and compare
these approaches for discussion purpose:
**Filter Out Partitions**
As Fokko suggested, we could filter the table to get partitions before
writing but we will need an API on Arrow to get unique partition values (e.g.
extend compute.unique() from array/scalar to table).
```
partitions: list[dict] = pyarrow.compute.unique(arrow_table)
```
With it, we could filter the table to get partitions and provide them as
inputs to concurrent jobs.
```
arrow_table = pa.table({
'column1': ['A', 'B', 'A', 'C', 'B'],
'column2': [1, 2, 1, 3, 2],
'column3': ['X', 'Y', 'X', 'Z', 'Y']
})
partition_keys = table.select(['column1', 'column2'])
# The existing unique does not have support on table, we need to create API
on Arrow side.
partitions: list[dict] = pyarrow.compute.unique(partition_keys)
# = [
{'column1': 'A', 'column2': 1},
{'column1': 'B', 'column2': 2},
{'column1': 'C', 'column3': 3}
]
def filter_and_write_table(partition, index):
# Create a boolean mask for rows that match the criteria
mask = pc.and_(*(pc.equal(table[col], val) for col, val in
partition.items()))
# Filter the table
filtered_table = table.filter(mask)
# Write the filtered table to a Parquet file
parquet_file = f'output_partition_{index}.parquet'
pq.write_table(filtered_table, parquet_file)
with ThreadPoolExecutor() as executor:
for i, partition in enumerate(partitions):
executor.submit(filter_and_write_table, partition, i)
```
**Sort and Single-direction Writing**
As Fokko suggested, we could sort the table first. We then slice the table
and do a one-direction scan for each slice to write out partitioned files.
Suppose we have such an arrow API that takes a sorted table, scans through
it, creates a new file whenever encountering a row with a new partition value,
and raises an error if it encounters a row with a partition value it already
passes, just like how spark writes to an iceberg table.
```
def write_table_partitions(sorted_table, partition_columns, dir)
```
Then we could do
```
partition_columns = ['column1', 'column2']
sorted_table = table.sort_by([('column1', 'ascending'), ('column2',
'ascending')])
directory_path = '/path/to/output/directory'
# Break down the sorted table to slices with zero-copy
slices = slice_table(sorted_table, slice_options)
# Call the API
with ThreadPoolExecutor() as executor:
# Submit tasks to the executor
for i, partition in enumerate(partitions):
executor.submit(write_table_partitions, sorted_table,
partition_columns, dir)
```
**Bucketing**
We could create an arrow API to return the partitioned tables/record batches
based on the inputs of a table and alist of partition columns in a way that the
algorithm does a full scan of the arrow table in O(table_length) time and
bucket-sorts it and creates a table/record batch for each bucket:
```
table_partitions = pyarrow.compute.partition(arrow_table, partition_columns)
```
We could write each batch:
```
def write_table_to_parquet(table, directory_path, file_suffix):
# Construct the full path for the Parquet file
file_path = os.path.join(directory_path,
f'record_batch_{file_suffix}.parquet')
# Write the table to a Parquet file
pq.write_table(table, file_path)
with ThreadPoolExecutor() as executor:
for i, partition_as_table in enumerate(table_partitions):
executor.submit(write_table_to_parquet, partition_as_table,
directory_path, i)
```
As Fokko pointed out, the filter method will not be efficient if there are
many partitions - the filter takes O(table_length) time and although each
thread can filter on its own, on a single node, the execution will be
O(table_length * number_of_partitions) for all the jobs. Technically we only
need one same scan to get all the buckets.
It seems the sort method is not as efficient compared to the bucketing
method because the relative order of partitions does not matter, so a general
sort algorithm on the partition column might be overkill (compared with
bucketing).
I feel like all 3 directions require some implementation on Arrow itself (I
did not find any approach simple enough that we could purely leverage the
existing Pyarrow APIs to implement any of the direction). And **I want to get
opinions on whether pursuing arrow API level utilities smell good**. Thank you!
Specifically, for the third direction of bucketing and returning
materialized tables/batches, since Arrow has dataset.write_dataset() which
supports partition-respected writing, I did some reading to see how it
partitions and whether we could leverage anything from it.
https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118
is where the partition happens. The partition algorithm is a full scan with
bucket sort leveraging Grouper class utilities in arrow's compute component.
Specifically:
1.Grouper.consume() initiates the groups based on the presenting partition
columns
https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L422
2.Grouper.MakeGroupings() builds a ListArray where each list represents a
partition and each element in the list represents the row_id of the original
table.
https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L886C45-L886C58
3.Grouper.ApplyGroupings() efficiently converts the grouped representation
of row_ids into actual rows.
https://github.com/apache/arrow/blob/55afcf0450aa2b611e78335bdbfd77e55ae3bc9f/cpp/src/arrow/compute/row/grouper.cc#L875
Other than being used in the dataset writing, Grouper from Arrow's compute
component is used to support other exposed compute APIs such as aggregation
functions. At the end of the day, what we want (in order to support Pyiceberg's
partitioned write) is an API that returns record batches/tables based on an
input table and an input partition scheme, so maybe we could expose such a new
API under compute leveraging Grouper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]