"chunk" is number of rows for a table, but each column can be individually chunked, in which case a chunk is number of contiguous values (and only values of a column/Array are contiguous). Otherwise, pretty much everything else is accurate, but I'd also add that there is some overhead in schema and schema metadata per batch that you're not accounting for when just using rows to approximate size. In that case you will always write a larger size than anticipated with the simple average and in scenarios where that size is strict (page sizes), it can be problematic. But certainly the simple average is a good naive place to start.
Sent from Proton Mail for iOS On Sun, Mar 3, 2024 at 12:40, Kevin Liu <kevin.jq....@gmail.com> wrote: Thank you both for the thoughtful response. The main concept I was missing was what “chunk” and “batch” mean in the Arrow context. It makes more sense now to think of “chunks” as the number of rows, and “max_chunksize” as the maximum number of rows in a batch. I’m going to document my thought process to the question above for future reference. In order to create record batches of a predetermined size from an Arrow table, we need to calculate how many rows should be in each batch. This is because the number of rows in a batch directly correlates to the size of the batch in bytes. Since Arrow is a columnar store, all columns must be present. So the smallest unit of storage is 1 row of all the columns. The storage size thus correlates to the number of rows packed into the same batch. (This is similar to parquet “row group” concept) The formula for target file size is then:target file size = (fixed number of columns) * (variable number of rows) We want to calculate the number of rows to pack in each batch so that each batch is the target file size.To calculate number of rows to pack into a batch, we can use the average row size and the target file size.(Average row size) = (table size) / (number of rows) (Number of rows) = (target file size) / (average row size) We can then pass number of rows to the `to_batches`’s “max_chunksize” to get a chunk with a desired target file size. The above method is implemented here so that we can batch an Arrow table into multiple batches of 512 MB and parallel write them as multiple parquet files. In the case where multiple small chunks are created, such as Case #1, the code then bin-packs the smaller chunks into the target file size. On Mon, Feb 26, 2024 at 9:25 AM Aldrin <octalene....@pm.me> wrote: Hi Kevin, Shoumyo is correct that the chunk size of to_batches is row-based (logical) and not byte-based (physical), see the example in the documentation [1]. And for more clarity on the "...depending on the chunk layout of individual columns" portion, a Table column is a ChunkedArray, which is essentially a vector<Array>, so that determines the chunking for the table. When using to_batches(), the logic is essentially to see if a chunk is too big and needs to be resized [2], otherwise leave it as is. There are functions to get total buffer size (physical size) such as get_total_buffer_size [3] or nbytes [4]. But, there is a bit of guess and check to use that directly for your needs. I don't think there is a canonical approach to satisfying your requirement, but you can check the docs for IPC streams [5] or the dataset API ("configuring rows per group" [6]) if you're eventually going to write to files. Otherwise, a naive approach is essentially a loop that tries varying chunk sizes until it produces a satisfactory byte size. If you're willing to do some coding at the cython level (since you're looking at that source), then you can get a TableBatchReader and iteratively set a chunksize and read a next batch (loop around [7]); or maybe use slice() for cheap chunking. I would recommend making your initial chunksize guess based on your schema or by "empirically" checking a chunksize of 1. [1]: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_batches [2]: https://github.com/apache/arrow/blob/main/cpp/src/arrow/table.cc#L655-L657 [3]: https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.get_total_buffer_size [4]: https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.nbytes [5]: https://arrow.apache.org/docs/python/ipc.html#using-streams [6]: https://arrow.apache.org/docs/python/dataset.html#configuring-rows-per-group-during-a-write [7]: https://github.com/apache/arrow/blob/main/python/pyarrow/table.pxi#L4235-L4248 # ------------------------------ # Aldrin https://github.com/drin/ https://gitlab.com/octalene https://keybase.io/octalene On Monday, February 26th, 2024 at 08:01, Shoumyo Chakravorti (BLOOMBERG/ 731 LEX) <schakravo...@bloomberg.net> wrote: Hi Kevin, I'm not an Arrow dev so take everything I say with a grain a salt. I just wanted to point out that the `max_chunksize` appears to refer to the max number of *rows* per batch rather than the number of *bytes* per batch: https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L647-L649. Additionally, `Table.to_batches()` is documented as being zero-copy, and as you referred: "Individual chunks may be smaller depending on the chunk layout of individual columns". This method will not copy the data in an effort to make RecordBatches of more uniform size. In light of this information: In Case #1: `example_tbl.to_batches() * multiplier` creates 2048 RecordBatches, and the Table becomes a zero-copy wrapper on top of the data for each of these RecordBatches. Each ChunkedArray in the Table will contain 2048 chunks/arrays. When this Table is converted via `to_batches()`, by its zero-copy nature, the chunks will not be concatenated. Therefore `bigger_pylist_tbl.to_batches` will yield at least 2048 batches. In Case #2: `max_chunksize > len(huge_arrow_tbl)`, and you mention that the Table only has a single RecordBatch, so there is nothing to chunk up. I'm not sufficiently familiar with the Arrow APIs to know of a way to chunk by a target number of bytes, so I'll let others chime in on that. Best,Shoumyo From: user@arrow.apache.org At: 02/26/24 01:08:41 UTC-5:00To: user@arrow.apache.org Subject: Chunk Table into RecordBatches of at most 512MB each Hey folks, I'm working with the PyArrow API for Tables and RecordBatches. And I'm trying to chunk a Table into a list of RecordBatches each with a default chunk size. For example, 10 GB into several 512MB chunks. I'm having a hard time doing this using the existing API. The Table.to_batches method has an optional parameter `max_chunksize` which is documented as "Maximum size for RecordBatch chunks. Individual chunks may be smaller depending on the chunk layout of individual columns." It seems exactly like what I want but I've run into a couple of edge cases. Edge case 1, Table created using many RecordBatches```pylist = [{'n_legs': 2, 'animals': 'Flamingo'}, {'n_legs': 4, 'animals': 'Dog'}] pylist_tbl = pa.Table.from_pylist(pylist) # pylist_tbl.nbytes # > 35multiplier = 2048 bigger_pylist_tbl = pa.Table.from_batches(example_tbl.to_batches() * multiplier) # bigger_pylist_tbl.nbytes # 591872 / 578.00 KB target_batch_size = 512 * 1024 * 1024 # 512 MBlen(bigger_pylist_tbl.to_batches(target_batch_size))# > 2048# expected, 1 RecordBatch``` Edge case 2, really big Table with 1 RecordBatch```# file already saved on disk with pa.memory_map('table_10000000.arrow', 'r') as source: huge_arrow_tbl = pa.ipc.open_file(source).read_all() huge_arrow_tbl.nbytes # 7188263146 / 6.69 GB len(huge_arrow_tbl) # 10_000_000 target_batch_size = 512 * 1024 * 1024 # 512 MBlen(huge_arrow_tbl.to_batches(target_batch_size)) # > 1# expected (6.69 GB // 512 MB) + 1 RecordBatches``` I'm currently exploring the underlying implementation for to_batches and TableBatchReader::ReadNext.Please let me know if anyone knows a canonical way to satisfy the chunking behavior described above. Thanks,Kevin
signature.asc
Description: OpenPGP digital signature