"chunk" is number of rows for a table, but each column can be individually 
chunked, in which case a chunk is number of contiguous values (and only values 
of a column/Array are contiguous).
Otherwise, pretty much everything else is accurate, but I'd also add that there 
is some overhead in schema and schema metadata per batch that you're not 
accounting for when just using rows to approximate size. In that case you will 
always write a larger size than anticipated with the simple average and in 
scenarios where that size is strict (page sizes), it can be problematic. But 
certainly the simple average is a good naive place to start.

 Sent from Proton Mail for iOS 
On Sun, Mar 3, 2024 at 12:40, Kevin Liu <kevin.jq....@gmail.com> wrote:  
Thank you both for the thoughtful response. The main concept I was missing was 
what “chunk” and “batch” mean in the Arrow context. It makes more sense now to 
think of “chunks” as the number of rows, and “max_chunksize” as the maximum 
number of rows in a batch. I’m going to document my thought process to the 
question above for future reference. In order to create record batches of a 
predetermined size from an Arrow table, we need to calculate how many rows 
should be in each batch. This is because the number of rows in a batch directly 
correlates to the size of the batch in bytes. Since Arrow is a columnar store, 
all columns must be present. So the smallest unit of storage is 1 row of all 
the columns. The storage size thus correlates to the number of rows packed into 
the same batch. (This is similar to parquet “row group” concept) The formula 
for target file size is then:target file size = (fixed number of columns) * 
(variable number of rows)
We want to calculate the number of rows to pack in each batch so that each 
batch is the target file size.To calculate number of rows to pack into a batch, 
we can use the average row size and the target file size.(Average row size) = 
(table size) / (number of rows) (Number of rows) = (target file size) / 
(average row size) We can then pass number of rows to the `to_batches`’s 
“max_chunksize” to get a chunk with a desired target file size. The above 
method is implemented here so that we can batch an Arrow table into multiple 
batches of 512 MB and parallel write them as multiple parquet files.
In the case where multiple small chunks are created, such as Case #1, the code 
then bin-packs the smaller chunks into the target file size. On Mon, Feb 26, 
2024 at 9:25 AM Aldrin <octalene....@pm.me> wrote:
Hi Kevin,
Shoumyo is correct that the chunk size of to_batches is row-based (logical) and 
not byte-based (physical), see the example in the documentation [1]. And for 
more clarity on the "...depending on the chunk layout of individual columns" 
portion, a Table column is a ChunkedArray​, which is essentially a 
vector<Array>​, so that determines the chunking for the table. When using 
to_batches(), the logic is essentially to see if a chunk is too big and needs 
to be resized [2], otherwise leave it as is.

There are functions to get total buffer size (physical size) such as 
get_total_buffer_size​ [3] or nbytes [4]. But, there is a bit of guess and 
check to use that directly for your needs. I don't think there is a canonical 
approach to satisfying your requirement, but you can check the docs for IPC 
streams [5] or the dataset API ("configuring rows per group" [6]) if you're 
eventually going to write to files.

Otherwise, a naive approach is essentially a loop that tries varying chunk 
sizes until it produces a satisfactory byte size. If you're willing to do some 
coding at the cython level (since you're looking at that source), then you can 
get a TableBatchReader and iteratively set a chunksize and read a next batch 
(loop around [7]); or maybe use slice()​ for cheap chunking. I would recommend 
making your initial chunksize guess based on your schema or by "empirically" 
checking a chunksize of 1.


[1]: 
https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_batches
[2]: https://github.com/apache/arrow/blob/main/cpp/src/arrow/table.cc#L655-L657
[3]: 
https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.get_total_buffer_size
[4]: 
https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html#pyarrow.RecordBatch.nbytes
[5]: https://arrow.apache.org/docs/python/ipc.html#using-streams
[6]: 
https://arrow.apache.org/docs/python/dataset.html#configuring-rows-per-group-during-a-write
[7]: 
https://github.com/apache/arrow/blob/main/python/pyarrow/table.pxi#L4235-L4248



            # ------------------------------
# Aldrin

https://github.com/drin/
https://gitlab.com/octalene
https://keybase.io/octalene

    
            
            


        On Monday, February 26th, 2024 at 08:01, Shoumyo Chakravorti 
(BLOOMBERG/ 731 LEX) <schakravo...@bloomberg.net> wrote:

                    Hi Kevin,
I'm not an Arrow dev so take everything I say with a grain a salt. I just 
wanted to point out that the `max_chunksize` appears to refer to the max number 
of *rows* per batch rather than the number of *bytes* per batch: 
https://github.com/apache/arrow/blob/b8fff043c6cb351b1fad87fa0eeaf8dbc550e37c/cpp/src/arrow/table.cc#L647-L649.
Additionally, `Table.to_batches()` is documented as being zero-copy, and as you 
referred: "Individual chunks may be smaller depending on the chunk layout of 
individual columns". This method will not copy the data in an effort to make 
RecordBatches of more uniform size.
In light of this information:
In Case #1: `example_tbl.to_batches() * multiplier` creates 2048 RecordBatches, 
and the Table becomes a zero-copy wrapper on top of the data for each of these 
RecordBatches. Each ChunkedArray in the Table will contain 2048 chunks/arrays. 
When this Table is converted via `to_batches()`, by its zero-copy nature, the 
chunks will not be concatenated. Therefore `bigger_pylist_tbl.to_batches` will 
yield at least 2048 batches.

In Case #2: `max_chunksize > len(huge_arrow_tbl)`, and you mention that the 
Table only has a single RecordBatch, so there is nothing to chunk up.

I'm not sufficiently familiar with the Arrow APIs to know of a way to chunk by 
a target number of bytes, so I'll let others chime in on that.
Best,Shoumyo
From: user@arrow.apache.org At: 02/26/24 01:08:41 UTC-5:00To:  
user@arrow.apache.org
Subject: Chunk Table into RecordBatches of at most 512MB each
Hey folks, 
I'm working with the PyArrow API for Tables and RecordBatches. And I'm trying 
to chunk a Table into a list of RecordBatches each with a default chunk size. 
For example, 10 GB into several 512MB chunks. I'm having a hard time doing this 
using the existing API. The Table.to_batches method has an optional parameter 
`max_chunksize` which is documented as "Maximum size for RecordBatch chunks. 
Individual chunks may be smaller depending on the chunk layout of individual 
columns." It seems exactly like what I want but I've run into a couple of edge 
cases. Edge case 1, Table created using many RecordBatches```pylist = 
[{'n_legs': 2, 'animals': 'Flamingo'},
          {'n_legs': 4, 'animals': 'Dog'}]
pylist_tbl = pa.Table.from_pylist(pylist)
# pylist_tbl.nbytes # > 35multiplier = 2048
bigger_pylist_tbl = pa.Table.from_batches(example_tbl.to_batches() * multiplier)
# bigger_pylist_tbl.nbytes
# 591872 / 578.00 KB
target_batch_size = 512 * 1024 * 1024  # 512 
MBlen(bigger_pylist_tbl.to_batches(target_batch_size))# > 2048# expected, 1 
RecordBatch```
Edge case 2, really big Table with 1 RecordBatch```# file already saved on disk
with pa.memory_map('table_10000000.arrow', 'r') as source:
    huge_arrow_tbl = pa.ipc.open_file(source).read_all()

huge_arrow_tbl.nbytes
# 7188263146 / 6.69 GB
len(huge_arrow_tbl)
# 10_000_000
target_batch_size = 512 * 1024 * 1024  # 512 
MBlen(huge_arrow_tbl.to_batches(target_batch_size))
# > 1# expected (6.69 GB // 512 MB) + 1 RecordBatches```
I'm currently exploring the underlying implementation for to_batches and 
TableBatchReader::ReadNext.Please let me know if anyone knows a canonical way 
to satisfy the chunking behavior described above. 
Thanks,Kevin

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to