Thank you for the detailed explanation.
For this use case, the resulting files from Arrow representation are
typically much smaller than the target file size due to parquets
compression.

As a follow-up to the question above, is there a way to create parquet
files of the target file size from an Arrow table? Instead of a batch size
of target file size, I'd like the resulting files to be of target file size.
The reference implementation in Java
<https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L308>
checks whether to roll over to a new file after every 1000 rows written.



On Sun, Mar 3, 2024 at 2:59 PM Aldrin <octalene....@pm.me> wrote:

> "chunk" is number of rows for a table, but each column can be individually
> chunked, in which case a chunk is number of contiguous values (and only
> values of a column/Array are contiguous).
>
> Otherwise, pretty much everything else is accurate, but I'd also add that
> there is some overhead in schema and schema metadata per batch that you're
> not accounting for when just using rows to approximate size. In that case
> you will always write a larger size than anticipated with the simple
> average and in scenarios where that size is strict (page sizes), it can be
> problematic. But certainly the simple average is a good naive place to
> start.
>
>
> Sent from Proton Mail <https://proton.me/mail/home> for iOS
>
>
> On Sun, Mar 3, 2024 at 12:40, Kevin Liu <kevin.jq....@gmail.com
> <On+Sun,+Mar+3,+2024+at+12:40,+Kevin+Liu+%3C%3Ca+href=>> wrote:
>
> Thank you both for the thoughtful response.
>
> The main concept I was missing was what “chunk” and “batch” mean in the
> Arrow context. It makes more sense now to think of “chunks” as the number
> of rows, and “max_chunksize” as the maximum number of rows in a batch.
>
> I’m going to document my thought process to the question above for future
> reference.
>
> In order to create record batches of a predetermined size from an Arrow
> table, we need to calculate how many rows should be in each batch. This is
> because the number of rows in a batch directly correlates to the size of
> the batch in bytes.
>
> Since Arrow is a columnar store, all columns must be present. So the
> smallest unit of storage is 1 row of all the columns. The storage size thus
> correlates to the number of rows packed into the same batch. (This is
> similar to parquet “row group” concept)
>
> The formula for target file size is then:
>
> target file size = (fixed number of columns) * (variable number of rows)
>
> We want to calculate the number of rows to pack in each batch so that each
> batch is the target file size.
>
> To calculate number of rows to pack into a batch, we can use the average
> row size and the target file size.
>
> (Average row size) = (table size) / (number of rows)
>
> (Number of rows) = (target file size) / (average row size)
>
> We can then pass number of rows to the `to_batches`’s “max_chunksize” to
> get a chunk with a desired target file size.
>
> The above method is implemented here
> <https://github.com/apache/iceberg-python/pull/444/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR1768-R1770>
> so that we can batch an Arrow table into multiple batches of 512 MB and
> parallel write them as multiple parquet files.
>
> In the case where multiple small chunks are created, such as Case #1, the
> code then bin-packs
> <https://github.com/apache/iceberg-python/pull/444/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR1771-R1777>
> the smaller chunks into the target file size.
>
>
> On Mon, Feb 26, 2024 at 9:25 AM Aldrin <octalene....@pm.me> wrote:
>
>> Hi Kevin,
>>
>> Shoumyo is correct that the chunk size of to_batches is row-based
>> (logical) and not byte-based (physical), see the example in the
>> documentation [1]. And for more clarity on the "...depending on the chunk
>> layout of individual columns" portion, a Table column is a ChunkedArray​,
>> which is essentially a vector<Array>​, so that determines the chunking
>> for the table. When using to_batches(), the logic is essentially to see if
>> a chunk is too big and needs to be resized [2], otherwise leave it as is.
>>
>> There are functions to get total buffer size (physical size) such as
>> get_total_buffer_size​ [3] or nbytes [4]. But, there is a bit of guess
>> and check to use that directly for your needs. I don't think there is a
>> canonical approach to satisfying your requirement, but you can check the
>> docs for IPC streams [5] or the dataset API ("configuring rows per group"
>> [6]) if you're eventually going to write to files.
>>
>> Otherwise, a naive approach is essentially a loop that tries varying
>> chunk sizes until it produces a satisfactory byte size. If you're willing
>> to do some coding at the cython level (since you're looking at that
>> source), then you can get a TableBatchReader and iteratively set a
>> chunksize and read a next batch (loop around [7]); or maybe use slice()​
>> for cheap chunking. I would recommend making your initial chunksize guess
>> based on your schema or by "empirically" checking a chunksize of 1.
>>
>>
>> [1]:
>> https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_batches
>> [2]:
>> https://github.com/apache/arrow/blob/main/cpp/src/arrow/table.cc#L655-L657
>> [3]:
>> https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.get_total_buffer_size
>> [4]:
>> https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.nbytes
>> [5]: https://arrow.apache.org/docs/python/ipc.html#using-streams
>> [6]:
>> https://arrow.apache.org/docs/python/dataset.html#configuring-rows-per-group-during-a-write
>> [7]:
>> https://github.com/apache/arrow/blob/main/python/pyarrow/table.pxi#L4235-L4248
>>
>>
>> # ------------------------------
>> # Aldrin
>>
>> https://github.com/drin/
>> https://gitlab.com/octalene
>> https://keybase.io/octalene
>>
>> On Monday, February 26th, 2024 at 08:01, Shoumyo Chakravorti (BLOOMBERG/
>> 731 LEX) <schakravo...@bloomberg.net> wrote:
>>
>> Hi Kevin,
>>
>> I'm not an Arrow dev so take everything I say with a grain a salt. I just
>> wanted to point out that the `max_chunksize` appears to refer to the max
>> number of *rows* per batch rather than the number of *bytes* per batch:
>> https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L647-L649
>> .
>>
>> Additionally, `Table.to_batches()` is documented as being zero-copy, and
>> as you referred: "Individual chunks may be smaller depending on the
>> chunk layout of individual columns". This method will not copy the data
>> in an effort to make RecordBatches of more uniform size.
>>
>> In light of this information:
>>
>>
>>    - In Case #1: `example_tbl.to_batches() * multiplier` creates 2048
>>    RecordBatches, and the Table becomes a zero-copy wrapper on top of the 
>> data
>>    for each of these RecordBatches. Each ChunkedArray in the Table will
>>    contain 2048 chunks/arrays. When this Table is converted via
>>    `to_batches()`, by its zero-copy nature, the chunks will not be
>>    concatenated. Therefore `bigger_pylist_tbl.to_batches` will yield at least
>>    2048 batches.
>>
>>    - In Case #2: `max_chunksize > len(huge_arrow_tbl)`, and you mention
>>    that the Table only has a single RecordBatch, so there is nothing to chunk
>>    up.
>>
>>
>> I'm not sufficiently familiar with the Arrow APIs to know of a way to
>> chunk by a target number of bytes, so I'll let others chime in on that.
>>
>> Best,
>> Shoumyo
>>
>> From: user@arrow.apache.org At: 02/26/24 01:08:41 UTC-5:00
>> To: user@arrow.apache.org
>> Subject: Chunk Table into RecordBatches of at most 512MB each
>>
>> Hey folks,
>>
>> I'm working with the PyArrow API for Tables and RecordBatches. And I'm
>> trying to chunk a Table into a list of RecordBatches each with a default
>> chunk size. For example, 10 GB into several 512MB chunks.
>>
>> I'm having a hard time doing this using the existing API. The
>> Table.to_batches method has an optional parameter `max_chunksize` which
>> is documented as "Maximum size for RecordBatch chunks. Individual chunks
>> may be smaller depending on the chunk layout of individual columns." It
>> seems exactly like what I want but I've run into a couple of edge cases.
>>
>> Edge case 1, Table created using many RecordBatches
>> ```
>> pylist = [{'n_legs': 2, 'animals': 'Flamingo'},
>> {'n_legs': 4, 'animals': 'Dog'}]
>> pylist_tbl = pa.Table.from_pylist(pylist)
>> # pylist_tbl.nbytes
>> # > 35
>> multiplier = 2048
>> bigger_pylist_tbl = pa.Table.from_batches(example_tbl.to_batches() *
>> multiplier)
>> # bigger_pylist_tbl.nbytes
>> # 591872 / 578.00 KB
>>
>> target_batch_size = 512 * 1024 * 1024 # 512 MB
>> len(bigger_pylist_tbl.to_batches(target_batch_size))
>> # > 2048
>> # expected, 1 RecordBatch
>> ```
>>
>> Edge case 2, really big Table with 1 RecordBatch
>> ```
>> # file already saved on disk
>> with pa.memory_map('table_10000000.arrow', 'r') as source:
>> huge_arrow_tbl = pa.ipc.open_file(source).read_all()
>>
>> huge_arrow_tbl.nbytes
>> # 7188263146 / 6.69 GB
>> len(huge_arrow_tbl)
>> # 10_000_000
>>
>> target_batch_size = 512 * 1024 * 1024 # 512 MB
>> len(huge_arrow_tbl.to_batches(target_batch_size))
>> # > 1
>> # expected (6.69 GB // 512 MB) + 1 RecordBatches
>> ```
>>
>> I'm currently exploring the underlying implementation for to_batches
>> <https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/python/pyarrow/table.pxi#L4182C26-L4250>
>> and TableBatchReader::ReadNext
>> <https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L641-L691>
>> .
>> Please let me know if anyone knows a canonical way to satisfy the
>> chunking behavior described above.
>>
>> Thanks,
>> Kevin
>>
>>
>>
>>
>>
>>
>>

Reply via email to