Thank you for the detailed explanation. For this use case, the resulting files from Arrow representation are typically much smaller than the target file size due to parquets compression.
As a follow-up to the question above, is there a way to create parquet files of the target file size from an Arrow table? Instead of a batch size of target file size, I'd like the resulting files to be of target file size. The reference implementation in Java <https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L308> checks whether to roll over to a new file after every 1000 rows written. On Sun, Mar 3, 2024 at 2:59 PM Aldrin <octalene....@pm.me> wrote: > "chunk" is number of rows for a table, but each column can be individually > chunked, in which case a chunk is number of contiguous values (and only > values of a column/Array are contiguous). > > Otherwise, pretty much everything else is accurate, but I'd also add that > there is some overhead in schema and schema metadata per batch that you're > not accounting for when just using rows to approximate size. In that case > you will always write a larger size than anticipated with the simple > average and in scenarios where that size is strict (page sizes), it can be > problematic. But certainly the simple average is a good naive place to > start. > > > Sent from Proton Mail <https://proton.me/mail/home> for iOS > > > On Sun, Mar 3, 2024 at 12:40, Kevin Liu <kevin.jq....@gmail.com > <On+Sun,+Mar+3,+2024+at+12:40,+Kevin+Liu+%3C%3Ca+href=>> wrote: > > Thank you both for the thoughtful response. > > The main concept I was missing was what “chunk” and “batch” mean in the > Arrow context. It makes more sense now to think of “chunks” as the number > of rows, and “max_chunksize” as the maximum number of rows in a batch. > > I’m going to document my thought process to the question above for future > reference. > > In order to create record batches of a predetermined size from an Arrow > table, we need to calculate how many rows should be in each batch. This is > because the number of rows in a batch directly correlates to the size of > the batch in bytes. > > Since Arrow is a columnar store, all columns must be present. So the > smallest unit of storage is 1 row of all the columns. The storage size thus > correlates to the number of rows packed into the same batch. (This is > similar to parquet “row group” concept) > > The formula for target file size is then: > > target file size = (fixed number of columns) * (variable number of rows) > > We want to calculate the number of rows to pack in each batch so that each > batch is the target file size. > > To calculate number of rows to pack into a batch, we can use the average > row size and the target file size. > > (Average row size) = (table size) / (number of rows) > > (Number of rows) = (target file size) / (average row size) > > We can then pass number of rows to the `to_batches`’s “max_chunksize” to > get a chunk with a desired target file size. > > The above method is implemented here > <https://github.com/apache/iceberg-python/pull/444/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR1768-R1770> > so that we can batch an Arrow table into multiple batches of 512 MB and > parallel write them as multiple parquet files. > > In the case where multiple small chunks are created, such as Case #1, the > code then bin-packs > <https://github.com/apache/iceberg-python/pull/444/files#diff-8d5e63f2a87ead8cebe2fd8ac5dcf2198d229f01e16bb9e06e21f7277c328abdR1771-R1777> > the smaller chunks into the target file size. > > > On Mon, Feb 26, 2024 at 9:25 AM Aldrin <octalene....@pm.me> wrote: > >> Hi Kevin, >> >> Shoumyo is correct that the chunk size of to_batches is row-based >> (logical) and not byte-based (physical), see the example in the >> documentation [1]. And for more clarity on the "...depending on the chunk >> layout of individual columns" portion, a Table column is a ChunkedArray, >> which is essentially a vector<Array>, so that determines the chunking >> for the table. When using to_batches(), the logic is essentially to see if >> a chunk is too big and needs to be resized [2], otherwise leave it as is. >> >> There are functions to get total buffer size (physical size) such as >> get_total_buffer_size [3] or nbytes [4]. But, there is a bit of guess >> and check to use that directly for your needs. I don't think there is a >> canonical approach to satisfying your requirement, but you can check the >> docs for IPC streams [5] or the dataset API ("configuring rows per group" >> [6]) if you're eventually going to write to files. >> >> Otherwise, a naive approach is essentially a loop that tries varying >> chunk sizes until it produces a satisfactory byte size. If you're willing >> to do some coding at the cython level (since you're looking at that >> source), then you can get a TableBatchReader and iteratively set a >> chunksize and read a next batch (loop around [7]); or maybe use slice() >> for cheap chunking. I would recommend making your initial chunksize guess >> based on your schema or by "empirically" checking a chunksize of 1. >> >> >> [1]: >> https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_batches >> [2]: >> https://github.com/apache/arrow/blob/main/cpp/src/arrow/table.cc#L655-L657 >> [3]: >> https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.get_total_buffer_size >> [4]: >> https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.nbytes >> [5]: https://arrow.apache.org/docs/python/ipc.html#using-streams >> [6]: >> https://arrow.apache.org/docs/python/dataset.html#configuring-rows-per-group-during-a-write >> [7]: >> https://github.com/apache/arrow/blob/main/python/pyarrow/table.pxi#L4235-L4248 >> >> >> # ------------------------------ >> # Aldrin >> >> https://github.com/drin/ >> https://gitlab.com/octalene >> https://keybase.io/octalene >> >> On Monday, February 26th, 2024 at 08:01, Shoumyo Chakravorti (BLOOMBERG/ >> 731 LEX) <schakravo...@bloomberg.net> wrote: >> >> Hi Kevin, >> >> I'm not an Arrow dev so take everything I say with a grain a salt. I just >> wanted to point out that the `max_chunksize` appears to refer to the max >> number of *rows* per batch rather than the number of *bytes* per batch: >> https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L647-L649 >> . >> >> Additionally, `Table.to_batches()` is documented as being zero-copy, and >> as you referred: "Individual chunks may be smaller depending on the >> chunk layout of individual columns". This method will not copy the data >> in an effort to make RecordBatches of more uniform size. >> >> In light of this information: >> >> >> - In Case #1: `example_tbl.to_batches() * multiplier` creates 2048 >> RecordBatches, and the Table becomes a zero-copy wrapper on top of the >> data >> for each of these RecordBatches. Each ChunkedArray in the Table will >> contain 2048 chunks/arrays. When this Table is converted via >> `to_batches()`, by its zero-copy nature, the chunks will not be >> concatenated. Therefore `bigger_pylist_tbl.to_batches` will yield at least >> 2048 batches. >> >> - In Case #2: `max_chunksize > len(huge_arrow_tbl)`, and you mention >> that the Table only has a single RecordBatch, so there is nothing to chunk >> up. >> >> >> I'm not sufficiently familiar with the Arrow APIs to know of a way to >> chunk by a target number of bytes, so I'll let others chime in on that. >> >> Best, >> Shoumyo >> >> From: user@arrow.apache.org At: 02/26/24 01:08:41 UTC-5:00 >> To: user@arrow.apache.org >> Subject: Chunk Table into RecordBatches of at most 512MB each >> >> Hey folks, >> >> I'm working with the PyArrow API for Tables and RecordBatches. And I'm >> trying to chunk a Table into a list of RecordBatches each with a default >> chunk size. For example, 10 GB into several 512MB chunks. >> >> I'm having a hard time doing this using the existing API. The >> Table.to_batches method has an optional parameter `max_chunksize` which >> is documented as "Maximum size for RecordBatch chunks. Individual chunks >> may be smaller depending on the chunk layout of individual columns." It >> seems exactly like what I want but I've run into a couple of edge cases. >> >> Edge case 1, Table created using many RecordBatches >> ``` >> pylist = [{'n_legs': 2, 'animals': 'Flamingo'}, >> {'n_legs': 4, 'animals': 'Dog'}] >> pylist_tbl = pa.Table.from_pylist(pylist) >> # pylist_tbl.nbytes >> # > 35 >> multiplier = 2048 >> bigger_pylist_tbl = pa.Table.from_batches(example_tbl.to_batches() * >> multiplier) >> # bigger_pylist_tbl.nbytes >> # 591872 / 578.00 KB >> >> target_batch_size = 512 * 1024 * 1024 # 512 MB >> len(bigger_pylist_tbl.to_batches(target_batch_size)) >> # > 2048 >> # expected, 1 RecordBatch >> ``` >> >> Edge case 2, really big Table with 1 RecordBatch >> ``` >> # file already saved on disk >> with pa.memory_map('table_10000000.arrow', 'r') as source: >> huge_arrow_tbl = pa.ipc.open_file(source).read_all() >> >> huge_arrow_tbl.nbytes >> # 7188263146 / 6.69 GB >> len(huge_arrow_tbl) >> # 10_000_000 >> >> target_batch_size = 512 * 1024 * 1024 # 512 MB >> len(huge_arrow_tbl.to_batches(target_batch_size)) >> # > 1 >> # expected (6.69 GB // 512 MB) + 1 RecordBatches >> ``` >> >> I'm currently exploring the underlying implementation for to_batches >> <https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/python/pyarrow/table.pxi#L4182C26-L4250> >> and TableBatchReader::ReadNext >> <https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L641-L691> >> . >> Please let me know if anyone knows a canonical way to satisfy the >> chunking behavior described above. >> >> Thanks, >> Kevin >> >> >> >> >> >> >>