This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 8042ea288e Pluggable page spilling API for the Parquet ArrowWriter
(PageStore) (#10020)
8042ea288e is described below
commit 8042ea288e084107b602f9e25a850314942567b6
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Jun 4 12:24:25 2026 -0500
Pluggable page spilling API for the Parquet ArrowWriter (PageStore) (#10020)
- closes https://github.com/apache/arrow-rs/issues/10071
## Problem description
We currently buffer entire row groups in memory. From [our own
docs](https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#memory-usage-and-limiting):
> The nature of Parquet requires buffering of an entire row group before
it can be flushed to the underlying writer.
For our production workload where we have ~400 columns with large data
skews (some much larger than others) this causes >=12GBs of memory
consumed *just to write Parquet*.
When `ArrowWriter` writes a row group, record batches arrive with all
columns
interleaved, but each Parquet column chunk must be contiguous on disk.
So every
column's *compressed* pages are buffered for the whole row group and
only
spliced into the output at flush. Peak `ArrowWriter` memory is therefore
≈ Σ(compressed bytes of every column chunk) for one row group, and it
grows with
the row group size.
Today the only lever against this is
[ArrowWriter::in_progress_size](https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#method.in_progress_size)
+ flushing
smaller row groups — which trades away compression and read-time
(page/row-group)
pruning. This has negative consequences for encoding efficiency, read
performance, etc. Parquet already has pages, we don't need one column to
force the layout of another. Ideally what we'd want in a case like this
is a large (lets just say 1M row) row group with ~4 1MB pages for the
`id: i32` column and N ~1MB pages for the `large_text` column. Reading
the small id column has no data fragmentation penalty, no page index
bloat penalty, etc.
## Related issues
Some of the issues I could find
- https://github.com/apache/arrow-rs/issues/5828
- https://github.com/apache/arrow-rs/issues/5450
- https://github.com/apache/arrow-rs/issues/5484
## Proposed solution
Introduce a trait for pluggable buffering. In particular we would like
to implement spilling (spill buffered completed pages to disk). If this
works well it can be upstreamed / made easily configurable and usable
for all arrow users. I am not adding an implementation here to avoid
discussing those APIs (is it a temp dir, how does it get configured,
etc.).
## What changes are included in this PR?
A small, intentionally "dumb" key/value store trait and its wiring, in
four
stacked commits:
1. **`PageStore` + `PageKey` + `PageStoreFactory` + `InMemoryPageStore`,
wired
into `ArrowWriter`.** The store maps an opaque, store-allocated
`PageKey` to a
blob of bytes and knows nothing about pages, dictionaries, ordering, or
offsets — the caller keeps the handles and decides what they mean. The
default `InMemoryPageStore` (a `Vec<Bytes>`) adds no overhead and
produces
equivalent, valid output — byte-for-byte identical to the previous
buffering
for non-dictionary columns; dictionary columns differ only in the
`page_encoding_stats` ordering (see commit 4). A `PageStoreFactory` is
threaded
through `ArrowWriterOptions::with_page_store_factory` →
`ArrowRowGroupWriterFactory` → `ArrowColumnWriterFactory`.
2. **Stream column chunks out of the store at splice.** Replaces the
materialize-then-copy splice with a `Read` that takes each page blob
back out
of the store in write order *as it is consumed* and releases it
immediately,
so the splice never holds more than one page in memory at a time
(essential
for a spilling backend on skewed schemas). `append_column` is unchanged
for
external `ChunkReader` callers.
3. **Public `PageKey::new`/`get`** (so external backends can mint their
own
handles) **+ a memory regression test** (in-tree thread-local tracking
allocator) with a temp-file backend.
4. **Spill dictionary-column data pages too.** Dictionary-encoded
columns
buffered every completed data page in `GenericColumnWriter.data_pages`
until
`close()` (the dictionary page must be written first but isn't final
until all
values are seen), so those pages never reached the store. A new
`PageWriter::defers_dictionary_ordering()` lets a writer that buffers
the
whole chunk and splices later (the Arrow path) accept data pages
*before* the
dictionary page and order them itself; the column writer then streams
dictionary-column data pages straight through. `ArrowPageWriter` holds
the
(bounded, ≤ `dict_page_size_limit`) dictionary page in memory — it now
arrives
last — and emits it first at splice, where the production-order page
offsets
are rewritten to the dictionary-first layout. Because the data page is
now
produced before the dictionary page, the chunk's `page_encoding_stats`
lists
the data-page entry before the dictionary-page entry (the reverse of
`master`); the Parquet spec defines this list as an unordered set, so
the
output stays valid. The column-at-a-time
`SerializedFileWriter` path is unchanged (it commits bytes live and
still
buffers, which is inherent there). This commit also fixes
`memory_size()` to
report bytes the writer actually holds *resident* (via
`PageStore::memory_size` / `PageWriter::buffered_memory_size`) rather
than
bytes written, so it drops to ~0 once pages are spilled off-heap.
## Are these changes tested?
Yes:
- A **byte-identical round-trip test** using a custom `PageStore` with
sparse,
non-contiguous, `HashMap`-backed handles, proving the writer relies only
on
the opaque-handle contract across dictionary and non-dictionary columns
and
multiple row groups.
- A **dictionary round-trip test with the offset index disabled**,
covering the
path where only the chunk-level dictionary/data page offsets are
rewritten.
- Unit tests for the in-memory backend contract and its resident-byte
reporting.
- An **always-on memory regression test**
(`page_store_bounds_write_memory` in
`parquet/tests/arrow_writer.rs`, using an in-tree thread-local tracking
allocator) measuring peak heap, for both a skewed wide row group (~16
MiB) and
a low-cardinality dictionary column (~4.2M rows):
| scenario | in-memory store | temp-file spill |
|---|---|---|
| skewed ~16 MiB row group | ~18.3 MiB | ~4.2 MiB |
| dictionary column, 4.2M rows | ~2.69 MiB | ~0.48 MiB |
i.e. the spilling backend bounds peak write memory by the in-flight
encoder/dictionary buffers rather than the row group size, for both the
page
buffer and the dictionary-column data pages.
## Are there any user-facing changes?
New, additive public API (default behavior unchanged):
- `ArrowWriterOptions::with_page_store_factory`
- `PageStore`, `PageKey`, `PageStoreFactory`, `InMemoryPageStore`,
`InMemoryPageStoreFactory` (re-exported from
`parquet::arrow::arrow_writer`,
defined in `parquet::column::page_store`).
- New defaulted `PageWriter` trait methods
`defers_dictionary_ordering()` and
`buffered_memory_size()` (both default to the previous behavior), and a
defaulted `PageStore::memory_size()`.
## Not covered (by design)
- The **column-at-a-time `SerializedFileWriter` path** still buffers
dictionary-column data pages: it commits bytes to the file live, so the
dictionary-first ordering must be resolved during encoding. That path
already
has minimal memory otherwise.
- The in-flight **encoder buffer** and **dictionary** themselves stay
resident
(already bounded by the page/dict size limits), as do **bloom filters**.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
parquet/src/arrow/arrow_writer/mod.rs | 439 ++++++++++++++++++++++++++++++----
parquet/src/column/mod.rs | 1 +
parquet/src/column/page.rs | 58 +++++
parquet/src/column/page_store.rs | 257 ++++++++++++++++++++
parquet/src/column/writer/mod.rs | 53 +++-
parquet/src/file/writer.rs | 26 +-
parquet/tests/arrow_writer.rs | 343 +++++++++++++++++++++++++-
7 files changed, 1127 insertions(+), 50 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 86e6cf081f..21650fe26e 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -21,7 +21,6 @@ use crate::column::chunker::ContentDefinedChunker;
use bytes::Bytes;
use std::io::{Read, Write};
-use std::iter::Peekable;
use std::slice::Iter;
use std::sync::{Arc, Mutex};
use std::vec::IntoIter;
@@ -37,6 +36,7 @@ use super::schema::{add_encoded_arrow_schema_to_metadata,
decimal_length_from_pr
use crate::arrow::ArrowSchemaConverter;
use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
+use crate::basic::PageType;
use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
use crate::column::page_encryption::PageEncryptor;
use crate::column::writer::encoder::ColumnValueEncoder;
@@ -49,7 +49,6 @@ use crate::encryption::encrypt::FileEncryptor;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
-use crate::file::reader::{ChunkReader, Length};
use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
@@ -58,6 +57,12 @@ use levels::{ArrayLevels, calculate_array_levels};
mod byte_array;
mod levels;
+#[doc(inline)]
+pub use crate::column::page_store::{
+ InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore,
PageStoreArgs,
+ PageStoreFactory,
+};
+
/// Encodes [`RecordBatch`] to parquet
///
/// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`]
will be encoded
@@ -263,8 +268,12 @@ impl<W: Write + Send> ArrowWriter<W> {
let file_writer =
SerializedFileWriter::new(writer, schema.root_schema_ptr(),
Arc::clone(&props_ptr))?;
- let row_group_writer_factory =
+ let mut row_group_writer_factory =
ArrowRowGroupWriterFactory::new(&file_writer,
arrow_schema.clone());
+ if let Some(page_store_factory) = options.page_store_factory {
+ row_group_writer_factory =
+
row_group_writer_factory.with_page_store_factory(page_store_factory);
+ }
let cdc_chunkers = props_ptr
.content_defined_chunking()
@@ -556,6 +565,7 @@ pub struct ArrowWriterOptions {
skip_arrow_metadata: bool,
schema_root: Option<String>,
schema_descr: Option<SchemaDescriptor>,
+ page_store_factory: Option<Arc<dyn PageStoreFactory>>,
}
impl ArrowWriterOptions {
@@ -569,6 +579,90 @@ impl ArrowWriterOptions {
Self { properties, ..self }
}
+ /// Sets the [`PageStoreFactory`] used to buffer completed pages while a
row
+ /// group is being written.
+ ///
+ /// By default (an [`InMemoryPageStore`] per column chunk) completed pages
+ /// are buffered on the heap until the row group is flushed, so peak memory
+ /// grows with the row group size. Supplying a factory that spills to a
temp
+ /// file or object storage instead bounds peak write memory, decoupling it
+ /// from the row group size while keeping large, read-optimal row groups.
+ ///
+ /// # Example: a custom [`PageStore`]
+ ///
+ /// A store only has to map an opaque, store-allocated [`PageKey`] to a
blob
+ /// and hand the blob back once. The keys need not be dense or sequential —
+ /// here a `HashMap`-backed store mints sparse handles, proving the writer
+ /// relies only on the opaque-handle contract. A real spilling backend
would
+ /// write the bytes to a temp file in `put` and read them back in `take`.
+ ///
+ /// ```
+ /// # use std::collections::HashMap;
+ /// # use std::sync::Arc;
+ /// # use bytes::Bytes;
+ /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+ /// # use parquet::arrow::arrow_writer::{
+ /// # ArrowWriter, ArrowWriterOptions, PageKey, PageStore,
PageStoreArgs, PageStoreFactory,
+ /// # };
+ /// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
+ /// # use parquet::errors::{ParquetError, Result};
+ /// #[derive(Default)]
+ /// struct MapPageStore {
+ /// blobs: HashMap<u64, Bytes>,
+ /// next: u64,
+ /// }
+ ///
+ /// impl PageStore for MapPageStore {
+ /// fn put(&mut self, value: Bytes) -> Result<PageKey> {
+ /// // Mint a sparse handle (every other integer) to show the
writer
+ /// // never assumes anything about the key's value.
+ /// let key = PageKey::new(self.next);
+ /// self.next += 2;
+ /// self.blobs.insert(key.get(), value);
+ /// Ok(key)
+ /// }
+ ///
+ /// fn take(&mut self, key: PageKey) -> Result<Bytes> {
+ /// self.blobs
+ /// .remove(&key.get())
+ /// .ok_or_else(|| ParquetError::General(format!("invalid key
{}", key.get())))
+ /// }
+ /// }
+ ///
+ /// #[derive(Debug)]
+ /// struct MapPageStoreFactory;
+ ///
+ /// impl PageStoreFactory for MapPageStoreFactory {
+ /// fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn
PageStore>> {
+ /// // `args` exposes the column index and descriptor
(physical/logical
+ /// // type, path), so a real backend could spill only large
columns.
+ /// let _ = (args.column_index(), args.column_descriptor());
+ /// Ok(Box::new(MapPageStore::default()))
+ /// }
+ /// }
+ ///
+ /// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef;
+ /// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+ ///
+ /// let options =
+ ///
ArrowWriterOptions::new().with_page_store_factory(Arc::new(MapPageStoreFactory));
+ /// let mut buffer = Vec::new();
+ /// let mut writer =
+ /// ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(),
options).unwrap();
+ /// writer.write(&to_write).unwrap();
+ /// writer.close().unwrap();
+ ///
+ /// // The file is byte-identical to one written with the default store.
+ /// let mut reader =
ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
+ /// assert_eq!(to_write, reader.next().unwrap().unwrap());
+ /// ```
+ pub fn with_page_store_factory(self, page_store_factory: Arc<dyn
PageStoreFactory>) -> Self {
+ Self {
+ page_store_factory: Some(page_store_factory),
+ ..self
+ }
+ }
+
/// Skip encoding the embedded arrow metadata (defaults to `false`)
///
/// Parquet files generated by the [`ArrowWriter`] contain embedded arrow
schema
@@ -603,52 +697,108 @@ impl ArrowWriterOptions {
}
}
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
struct ArrowColumnChunkData {
length: usize,
- data: Vec<Bytes>,
+ store: Box<dyn PageStore>,
+ keys: Vec<PageKey>,
+ /// The dictionary page's serialized blobs (header ‖ data), held in memory
+ /// rather than the store.
+ ///
+ /// A dictionary page is produced at most once and bounded by
+ /// `dict_page_size_limit`, but it must be written *first* in the chunk
even
+ /// though the data pages reach the writer before it (see
+ /// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only
+ /// round-trip ~1 page to the backend and straight back, so it is kept here
+ /// and emitted ahead of the data pages at splice. Empty for non-dictionary
+ /// columns.
+ dictionary: Vec<Bytes>,
}
-impl Length for ArrowColumnChunkData {
- fn len(&self) -> u64 {
- self.length as _
+impl ArrowColumnChunkData {
+ fn new(store: Box<dyn PageStore>) -> Self {
+ Self {
+ length: 0,
+ store,
+ keys: Vec::new(),
+ dictionary: Vec::new(),
+ }
}
-}
-impl ChunkReader for ArrowColumnChunkData {
- type T = ArrowColumnChunkReader;
+ /// Append a data-page blob to the store, recording its handle in write
+ /// order.
+ fn push(&mut self, value: Bytes) -> Result<()> {
+ let key = self.store.put(value)?;
+ self.keys.push(key);
+ Ok(())
+ }
- fn get_read(&self, start: u64) -> Result<Self::T> {
- assert_eq!(start, 0); // Assume append_column writes all data in
one-shot
- Ok(ArrowColumnChunkReader(
- self.data.clone().into_iter().peekable(),
- ))
+ /// Retain a dictionary-page blob in memory (emitted first at splice).
+ fn push_dictionary(&mut self, value: Bytes) {
+ self.dictionary.push(value);
}
- fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
- unimplemented!()
+ /// Total serialized size of the in-memory dictionary page, in bytes.
+ fn dictionary_len(&self) -> usize {
+ self.dictionary.iter().map(Bytes::len).sum()
}
+
+ /// Bytes this chunk currently holds on the heap: whatever the store keeps
+ /// resident (zero for a spilling backend) plus the in-memory dictionary
+ /// page.
+ fn memory_size(&self) -> usize {
+ self.store.memory_size() + self.dictionary_len()
+ }
+}
+
+/// A streaming [`Read`] over one column chunk's buffered pages, in final file
+/// order: the in-memory dictionary page (if any) first, then the data pages.
+///
+/// Each data-page blob is taken back out of the [`PageStore`] *as it is
+/// consumed* and released immediately afterwards, so splicing a chunk into the
+/// output file never materializes more than a single page in memory at a time.
+/// This is what keeps the splice phase within the memory bound for a spilling
+/// backend (an in-memory store already holds the bytes, so it is unaffected).
+struct StreamingColumnChunkReader {
+ /// Dictionary-page blobs, emitted before any data page.
+ dictionary: IntoIter<Bytes>,
+ store: Box<dyn PageStore>,
+ keys: IntoIter<PageKey>,
+ /// The blob currently being drained into the output; emptied as it is
read.
+ current: Bytes,
}
-/// A [`Read`] for [`ArrowColumnChunkData`]
-struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
+impl StreamingColumnChunkReader {
+ fn new(data: ArrowColumnChunkData) -> Self {
+ Self {
+ dictionary: data.dictionary.into_iter(),
+ store: data.store,
+ keys: data.keys.into_iter(),
+ current: Bytes::new(),
+ }
+ }
+}
-impl Read for ArrowColumnChunkReader {
+impl Read for StreamingColumnChunkReader {
fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
- let buffer = loop {
- match self.0.peek_mut() {
- Some(b) if b.is_empty() => {
- self.0.next();
- continue;
- }
- Some(b) => break b,
- None => return Ok(0),
+ // Refill from the next blob whenever the current one is drained: the
+ // dictionary page first, then each data page from the store.
+ while self.current.is_empty() {
+ if let Some(blob) = self.dictionary.next() {
+ self.current = blob;
+ } else if let Some(key) = self.keys.next() {
+ self.current =
self.store.take(key).map_err(std::io::Error::other)?;
+ } else {
+ return Ok(0);
}
- };
+ }
- let len = buffer.len().min(out.len());
- let b = buffer.split_to(len);
+ let len = self.current.len().min(out.len());
+ let b = self.current.split_to(len);
out[..len].copy_from_slice(&b);
Ok(len)
}
@@ -660,7 +810,6 @@ impl Read for ArrowColumnChunkReader {
/// [`ArrowRowGroupWriter`] on flush, without requiring self-referential
borrows
type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
-#[derive(Default)]
struct ArrowPageWriter {
buffer: SharedColumnChunk,
#[cfg(feature = "encryption")]
@@ -668,6 +817,15 @@ struct ArrowPageWriter {
}
impl ArrowPageWriter {
+ /// Create a page writer that buffers completed pages in `store`.
+ fn new(store: Box<dyn PageStore>) -> Self {
+ Self {
+ buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
+ #[cfg(feature = "encryption")]
+ page_encryptor: None,
+ }
+ }
+
#[cfg(feature = "encryption")]
pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) ->
Self {
self.page_encryptor = page_encryptor;
@@ -726,12 +884,33 @@ impl PageWriter for ArrowPageWriter {
spec.bytes_written = compressed_size as u64;
buf.length += compressed_size;
- buf.data.push(header);
- buf.data.push(data);
+ if spec.page_type == PageType::DICTIONARY_PAGE {
+ // Held in memory and emitted first at splice — see
+ // `ArrowColumnChunkData::dictionary`.
+ buf.push_dictionary(header);
+ buf.push_dictionary(data);
+ } else {
+ buf.push(header)?;
+ buf.push(data)?;
+ }
Ok(spec)
}
+ fn defers_dictionary_ordering(&self) -> bool {
+ // The Arrow chunk is buffered in full and spliced at row-group flush,
so
+ // data pages may be accepted before the dictionary page and reordered
+ // then. This lets `GenericColumnWriter` stream dictionary-column data
+ // pages straight through instead of buffering them in memory.
+ true
+ }
+
+ fn buffered_memory_size(&self) -> usize {
+ // Only what is actually resident: a spilling store reports ~0 here
even
+ // though the chunk's bytes have all passed through it.
+ self.buffer.try_lock().unwrap().memory_size()
+ }
+
fn close(&mut self) -> Result<()> {
Ok(())
}
@@ -785,12 +964,21 @@ impl ArrowColumnChunk {
&mut self.close
}
- /// Calls [`SerializedRowGroupWriter::append_column`] with this column's
data
+ /// Splices this column's buffered pages into the row group, streaming them
+ /// back out of the [`PageStore`] one page at a time.
pub fn append_to_row_group<W: Write + Send>(
self,
writer: &mut SerializedRowGroupWriter<'_, W>,
) -> Result<()> {
- writer.append_column(&self.data, self.close)
+ let ArrowColumnChunk { data, close } = self;
+
+ // The dictionary page is produced *after* the data pages on this path
(so
+ // they can stream straight through) but must be written *first*, so
move
+ // it ahead of the data pages in the recorded offsets before the
splice.
+ let close = close.update_dictionary_location(data.dictionary_len())?;
+
+ let reader = StreamingColumnChunkReader::new(data);
+ writer.append_column_from_read(reader, close)
}
}
@@ -1082,6 +1270,7 @@ pub struct ArrowRowGroupWriterFactory {
schema: SchemaDescPtr,
arrow_schema: SchemaRef,
props: WriterPropertiesPtr,
+ page_store_factory: Arc<dyn PageStoreFactory>,
#[cfg(feature = "encryption")]
file_encryptor: Option<Arc<FileEncryptor>>,
}
@@ -1098,11 +1287,23 @@ impl ArrowRowGroupWriterFactory {
schema,
arrow_schema,
props,
+ page_store_factory: Arc::new(InMemoryPageStoreFactory),
#[cfg(feature = "encryption")]
file_encryptor: file_writer.file_encryptor(),
}
}
+ /// Set the [`PageStoreFactory`] used to allocate the buffer for each
column
+ /// chunk, e.g. to spill completed pages to a temp file or object storage
+ /// instead of the heap. Defaults to [`InMemoryPageStoreFactory`].
+ pub fn with_page_store_factory(
+ mut self,
+ page_store_factory: Arc<dyn PageStoreFactory>,
+ ) -> Self {
+ self.page_store_factory = page_store_factory;
+ self
+ }
+
fn create_row_group_writer(&self, row_group_index: usize) ->
Result<ArrowRowGroupWriter> {
let writers = self.create_column_writers(row_group_index)?;
Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
@@ -1127,12 +1328,13 @@ impl ArrowRowGroupWriterFactory {
#[cfg(feature = "encryption")]
fn column_writer_factory(&self, row_group_idx: usize) ->
ArrowColumnWriterFactory {
ArrowColumnWriterFactory::new()
+ .with_page_store_factory(self.page_store_factory.clone())
.with_file_encryptor(row_group_idx, self.file_encryptor.clone())
}
#[cfg(not(feature = "encryption"))]
fn column_writer_factory(&self, _row_group_idx: usize) ->
ArrowColumnWriterFactory {
- ArrowColumnWriterFactory::new()
+
ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
}
}
@@ -1159,6 +1361,8 @@ pub fn get_column_writers(
/// Creates [`ArrowColumnWriter`] instances
struct ArrowColumnWriterFactory {
+ /// Allocates the per-column-chunk [`PageStore`] backing each page writer.
+ page_store_factory: Arc<dyn PageStoreFactory>,
#[cfg(feature = "encryption")]
row_group_index: usize,
#[cfg(feature = "encryption")]
@@ -1168,6 +1372,7 @@ struct ArrowColumnWriterFactory {
impl ArrowColumnWriterFactory {
pub fn new() -> Self {
Self {
+ page_store_factory: Arc::new(InMemoryPageStoreFactory),
#[cfg(feature = "encryption")]
row_group_index: 0,
#[cfg(feature = "encryption")]
@@ -1175,6 +1380,15 @@ impl ArrowColumnWriterFactory {
}
}
+ /// Use `page_store_factory` to allocate the buffer for each column chunk.
+ pub fn with_page_store_factory(
+ mut self,
+ page_store_factory: Arc<dyn PageStoreFactory>,
+ ) -> Self {
+ self.page_store_factory = page_store_factory;
+ self
+ }
+
#[cfg(feature = "encryption")]
pub fn with_file_encryptor(
mut self,
@@ -1199,18 +1413,22 @@ impl ArrowColumnWriterFactory {
column_index,
&column_path,
)?;
+ let args = PageStoreArgs::new(column_index, column_descriptor);
+ let store = self.page_store_factory.create(&args)?;
Ok(Box::new(
- ArrowPageWriter::default().with_encryptor(page_encryptor),
+ ArrowPageWriter::new(store).with_encryptor(page_encryptor),
))
}
#[cfg(not(feature = "encryption"))]
fn create_page_writer(
&self,
- _column_descriptor: &ColumnDescPtr,
- _column_index: usize,
+ column_descriptor: &ColumnDescPtr,
+ column_index: usize,
) -> Result<Box<ArrowPageWriter>> {
- Ok(Box::<ArrowPageWriter>::default())
+ let args = PageStoreArgs::new(column_index, column_descriptor);
+ let store = self.page_store_factory.create(&args)?;
+ Ok(Box::new(ArrowPageWriter::new(store)))
}
/// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
@@ -1738,6 +1956,141 @@ mod tests {
statistics::Statistics,
};
+ /// A [`PageStore`] that allocates *sparse, non-contiguous* handles and
keeps
+ /// blobs in a `HashMap` — nothing like the default `Vec<Bytes>`. Used to
+ /// prove the writer relies only on the opaque-handle contract and never on
+ /// handles being dense `Vec` indices. Records how many blobs were stored.
+ #[derive(Debug, Default)]
+ struct RecordingPageStore {
+ next: u64,
+ blobs: HashMap<u64, Bytes>,
+ puts: Arc<std::sync::atomic::AtomicUsize>,
+ }
+
+ impl PageStore for RecordingPageStore {
+ fn put(&mut self, value: Bytes) -> Result<PageKey> {
+ // Deliberately non-sequential, never-zero handles.
+ let id = 100 + self.next * 7;
+ self.next += 1;
+ self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ self.blobs.insert(id, value);
+ Ok(PageKey::new(id))
+ }
+
+ fn take(&mut self, key: PageKey) -> Result<Bytes> {
+ self.blobs
+ .remove(&key.get())
+ .ok_or_else(|| ParquetError::General(format!("missing key {}",
key.get())))
+ }
+ }
+
+ #[derive(Debug)]
+ struct RecordingPageStoreFactory {
+ puts: Arc<std::sync::atomic::AtomicUsize>,
+ }
+
+ impl PageStoreFactory for RecordingPageStoreFactory {
+ fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn
PageStore>> {
+ Ok(Box::new(RecordingPageStore {
+ puts: self.puts.clone(),
+ ..Default::default()
+ }))
+ }
+ }
+
+ /// A custom [`PageStore`] must produce byte-identical files to the
in-memory
+ /// default, across dictionary and non-dictionary columns and multiple row
+ /// groups (so multiple store instances are exercised).
+ #[test]
+ fn custom_page_store_is_byte_identical_to_default() {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("i", DataType::Int32, true),
+ // A low-cardinality string column to exercise the dictionary path.
+ Field::new("s", DataType::Utf8, true),
+ ]));
+ let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4),
Some(5), Some(6)]);
+ let s = StringArray::from(vec![
+ Some("a"),
+ Some("bb"),
+ Some("a"),
+ None,
+ Some("bb"),
+ Some("ccc"),
+ ]);
+ let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i),
Arc::new(s)]).unwrap();
+
+ // Small row groups so multiple column chunks (hence multiple store
+ // instances) are produced.
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(Some(3))
+ .build();
+
+ let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
+ let mut buffer = Vec::new();
+ let mut opts =
ArrowWriterOptions::new().with_properties(props.clone());
+ if let Some(factory) = factory {
+ opts = opts.with_page_store_factory(factory);
+ }
+ let mut writer =
+ ArrowWriter::try_new_with_options(&mut buffer, schema.clone(),
opts).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+ buffer
+ };
+
+ let default_bytes = write(None);
+
+ let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+ let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
+ puts: puts.clone(),
+ })));
+
+ assert!(
+ puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
+ "custom PageStore was never written to"
+ );
+ assert_eq!(
+ default_bytes, custom_bytes,
+ "a custom PageStore must produce byte-identical output to the
default"
+ );
+ }
+
+ /// A dictionary-encoded column written through the deferred-ordering Arrow
+ /// path must round-trip correctly even with the offset index disabled,
when
+ /// only the chunk-level dictionary/data page offsets are rewritten (there
is
+ /// no offset index to rebuild). Spans multiple data pages so the
+ /// dictionary-first reordering is exercised.
+ #[test]
+ fn dictionary_column_round_trips_with_offset_index_disabled() {
+ let schema = Arc::new(Schema::new(vec![Field::new("k",
DataType::Int32, true)]));
+
+ // Low cardinality so the column stays dictionary-encoded; enough rows
to
+ // span several data pages within a single row group.
+ let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i %
8)).collect();
+ let array = Int32Array::from(values.clone());
+ let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(array)]).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_offset_index_disabled(true)
+ .set_data_page_row_count_limit(4096)
+ .build();
+ let opts = ArrowWriterOptions::new().with_properties(props);
+
+ let mut buffer = Vec::new();
+ let mut writer =
+ ArrowWriter::try_new_with_options(&mut buffer, schema.clone(),
opts).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer),
values.len()).unwrap();
+ let read: Vec<RecordBatch> =
reader.collect::<ArrowResult<_>>().unwrap();
+ let read_values: Vec<Option<i32>> = read
+ .iter()
+ .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
+ .collect();
+ assert_eq!(read_values, values);
+ }
+
#[test]
fn arrow_writer() {
// define schema
diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs
index 115c8dd01b..9c7e77d29c 100644
--- a/parquet/src/column/mod.rs
+++ b/parquet/src/column/mod.rs
@@ -125,5 +125,6 @@ pub(crate) mod page_encryption;
#[cfg(not(feature = "encryption"))]
#[path = "page_encryption_disabled.rs"]
pub(crate) mod page_encryption;
+pub mod page_store;
pub mod reader;
pub mod writer;
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 4cfc07a028..ed80e279a0 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -197,6 +197,15 @@ impl CompressedPage {
self.compressed_page.buffer()
}
+ /// Returns the number of heap bytes this page currently holds.
+ ///
+ /// This is the page's compressed buffer (the embedded [`Bytes`]); use it
to
+ /// account for a buffered page's memory footprint rather than reaching for
+ /// `data().len()` at each call site.
+ pub fn memory_usage(&self) -> usize {
+ self.compressed_page.buffer().len()
+ }
+
/// Returns the thrift page header
pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
let uncompressed_size = self.uncompressed_size();
@@ -430,6 +439,55 @@ pub trait PageWriter: Send {
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
+ /// **Unstable, not public API.** This is an internal protocol between
+ /// [`GenericColumnWriter`] and its in-crate page writers; it is hidden
from
+ /// the rendered docs and may change or be removed without a major version
+ /// bump. External `PageWriter` implementations should not override it. See
+ /// the page-spilling cleanup tracked in
+ /// <https://github.com/apache/arrow-rs/pull/10020>.
+ ///
+ /// Whether this writer resolves the final page layout itself (at flush)
+ /// rather than committing bytes to their final position as pages arrive.
+ ///
+ /// The dictionary page of a column chunk must be written *first*, but it
is
+ /// not finalized until every value has been seen. A writer that commits
+ /// bytes live (e.g. straight to a file) therefore relies on the column
+ /// writer buffering the dictionary-encoded data pages in memory until the
+ /// dictionary page is ready — see [`GenericColumnWriter`]'s `data_pages`.
+ ///
+ /// A writer that instead buffers the whole chunk and splices it later (the
+ /// [`ArrowWriter`] path) can accept data pages *before* the dictionary
page
+ /// and order them itself at flush. Returning `true` tells the column
writer
+ /// to skip that in-memory buffering and stream dictionary-column data
pages
+ /// straight through, bounding the column writer's memory.
+ ///
+ /// [`GenericColumnWriter`]: crate::column::writer::GenericColumnWriter
+ /// [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+ #[doc(hidden)]
+ fn defers_dictionary_ordering(&self) -> bool {
+ false
+ }
+
+ /// **Unstable, not public API.** Companion to
+ /// [`defers_dictionary_ordering`](Self::defers_dictionary_ordering): an
+ /// internal hook for the column writer's memory accounting, hidden from
the
+ /// rendered docs and subject to change or removal without a major version
+ /// bump. External `PageWriter` implementations should not override it.
+ ///
+ /// The number of bytes this writer is currently holding **in memory** for
+ /// pages it has been handed (i.e. completed pages not yet committed to
their
+ /// final destination).
+ ///
+ /// Used by the column writer to report its memory footprint. The default
is
+ /// `0`: a writer that streams pages straight to their destination retains
+ /// nothing. A writer that buffers pages should report what it actually
holds
+ /// on the heap — which, when it spills to a backing store, can be far less
+ /// than the bytes written.
+ #[doc(hidden)]
+ fn buffered_memory_size(&self) -> usize {
+ 0
+ }
+
/// Closes resources and flushes underlying sink.
/// Page writer should not be used after this method is called.
fn close(&mut self) -> Result<()>;
diff --git a/parquet/src/column/page_store.rs b/parquet/src/column/page_store.rs
new file mode 100644
index 0000000000..4f821c0e5c
--- /dev/null
+++ b/parquet/src/column/page_store.rs
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous in the file while record batches arrive with all columns
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescriptor;
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are
only
+/// meaningful to the store that produced them. The caller treats them as
opaque
+/// tokens.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(u64);
+
+impl PageKey {
+ /// Create a handle wrapping `raw`.
+ ///
+ /// A [`PageStore`] implementation calls this to mint the handle it returns
+ /// from [`put`](PageStore::put). The value is opaque to the caller, so a
+ /// store is free to use a dense counter, a packed locator, or anything
else
+ /// it can later resolve in [`take`](PageStore::take).
+ pub const fn new(raw: u64) -> Self {
+ Self(raw)
+ }
+
+ /// The raw value passed to [`new`](Self::new).
+ pub const fn get(self) -> u64 {
+ self.0
+ }
+}
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+ /// Store `value`, returning a handle that can later be passed to
+ /// [`take`](Self::take).
+ fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+ /// Take back the blob previously stored under `key`.
+ ///
+ /// The caller takes ownership of the returned bytes and will **not**
request
+ /// `key` again, so the store may release any resources backing it —
eagerly
+ /// here, or when the store is dropped.
+ fn take(&mut self, key: PageKey) -> Result<Bytes>;
+
+ /// The number of bytes this store currently holds **in memory** (resident
+ /// on the heap), used to report the writer's memory footprint.
+ ///
+ /// The default is `0`, which is exactly right for a backend that moves
+ /// every blob off-heap (a temp file, object storage): the bytes it has
been
+ /// handed no longer occupy heap. The in-memory backend overrides this to
+ /// report its resident blobs. A backend that keeps a partial in-memory
+ /// buffer should report that buffer's size.
+ fn memory_size(&self) -> usize {
+ 0
+ }
+}
+
+/// Context for a single [`PageStoreFactory::create`] call.
+///
+/// Describes the leaf column chunk the store will buffer. It is held by
+/// reference for the duration of the call; a backend reads only what it needs.
+/// More fields may be added in future releases without breaking existing
+/// implementations — the type is constructed only by the writer, so an
+/// implementer only ever receives one and calls its accessors.
+pub struct PageStoreArgs<'a> {
+ column_index: usize,
+ column_descriptor: &'a ColumnDescriptor,
+}
+
+impl<'a> PageStoreArgs<'a> {
+ // Constructed only by the Arrow writer; without that feature there is no
caller.
+ #[cfg(feature = "arrow")]
+ pub(crate) fn new(column_index: usize, column_descriptor: &'a
ColumnDescriptor) -> Self {
+ Self {
+ column_index,
+ column_descriptor,
+ }
+ }
+
+ /// Index of the leaf column within the row group.
+ ///
+ /// A backend may use this to e.g. name spill files or shard across a
bounded
+ /// pool; it carries no ordering or coordination requirement.
+ pub fn column_index(&self) -> usize {
+ self.column_index
+ }
+
+ /// Descriptor for the leaf column: physical/logical type, path, and max
+ /// definition/repetition levels.
+ ///
+ /// Lets a backend tailor buffering to the column — for example spilling
only
+ /// large `BYTE_ARRAY` columns while keeping small fixed-width ones on the
+ /// heap.
+ pub fn column_descriptor(&self) -> &ColumnDescriptor {
+ self.column_descriptor
+ }
+}
+
+/// Creates a fresh [`PageStore`] for each column chunk.
+///
+/// See
+///
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStoreFactory: Send + Sync + Debug {
+ /// Create a new, empty [`PageStore`] for the leaf column described by
`args`.
+ fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>>;
+}
+
+/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
+///
+/// Peak memory grows with the row group size; use a spilling backend to bound
+/// it.
+#[derive(Debug, Default)]
+pub struct InMemoryPageStore {
+ blobs: Vec<Bytes>,
+ /// Running total of resident blob bytes, kept in step with `put`/`take`.
+ resident: usize,
+}
+
+impl PageStore for InMemoryPageStore {
+ fn put(&mut self, value: Bytes) -> Result<PageKey> {
+ let key = PageKey(self.blobs.len() as u64);
+ self.resident += value.len();
+ self.blobs.push(value);
+ Ok(key)
+ }
+
+ fn take(&mut self, key: PageKey) -> Result<Bytes> {
+ // Replace the slot with an empty `Bytes` so the stored blob is
released
+ // as soon as it is taken, keeping memory bounded while the chunk is
+ // streamed into the output file.
+ let blob = self
+ .blobs
+ .get_mut(key.0 as usize)
+ .map(std::mem::take)
+ .ok_or_else(|| ParquetError::General(format!("invalid page key
{}", key.0)))?;
+ self.resident -= blob.len();
+ Ok(blob)
+ }
+
+ fn memory_size(&self) -> usize {
+ self.resident
+ }
+}
+
+/// Factory for [`InMemoryPageStore`] — the default used by
+/// [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter).
+#[derive(Debug, Default)]
+pub struct InMemoryPageStoreFactory;
+
+impl PageStoreFactory for InMemoryPageStoreFactory {
+ fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
+ Ok(Box::new(InMemoryPageStore::default()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn in_memory_round_trips_blobs_in_handle_order() {
+ let mut store = InMemoryPageStore::default();
+ let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
+ let k1 = store.put(Bytes::from_static(b"world")).unwrap();
+ assert_ne!(k0, k1);
+ assert_eq!(&store.take(k0).unwrap()[..], b"hello");
+ assert_eq!(&store.take(k1).unwrap()[..], b"world");
+ }
+
+ #[test]
+ fn in_memory_take_releases_the_slot() {
+ let mut store = InMemoryPageStore::default();
+ let k = store.put(Bytes::from_static(b"abc")).unwrap();
+ assert_eq!(&store.take(k).unwrap()[..], b"abc");
+ // A second take yields the emptied placeholder rather than the blob,
+ // confirming the bytes were released on the first take.
+ assert!(store.take(k).unwrap().is_empty());
+ }
+
+ #[test]
+ fn in_memory_invalid_key_errors() {
+ let mut store = InMemoryPageStore::default();
+ assert!(store.take(PageKey(99)).is_err());
+ }
+
+ #[test]
+ fn in_memory_reports_resident_bytes() {
+ let mut store = InMemoryPageStore::default();
+ assert_eq!(store.memory_size(), 0);
+ let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
+ let k1 = store.put(Bytes::from_static(b"!")).unwrap();
+ assert_eq!(store.memory_size(), 6);
+ store.take(k0).unwrap();
+ assert_eq!(store.memory_size(), 1);
+ store.take(k1).unwrap();
+ assert_eq!(store.memory_size(), 0);
+ }
+
+ #[test]
+ fn default_store_memory_size_is_zero() {
+ // A spilling backend that does not override `memory_size` reports 0,
+ // reflecting that its blobs no longer occupy the heap.
+ struct OffHeap;
+ impl PageStore for OffHeap {
+ fn put(&mut self, _value: Bytes) -> Result<PageKey> {
+ Ok(PageKey::new(0))
+ }
+ fn take(&mut self, _key: PageKey) -> Result<Bytes> {
+ Ok(Bytes::new())
+ }
+ }
+ assert_eq!(OffHeap.memory_size(), 0);
+ }
+}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 090350f53a..aa9cef16c5 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -209,6 +209,37 @@ pub struct ColumnCloseResult {
pub offset_index: Option<OffsetIndexMetaData>,
}
+impl ColumnCloseResult {
+ /// Rewrite the page offsets for a dictionary-first on-disk layout.
+ ///
+ /// A writer that buffers the whole column chunk and splices it later (the
+ /// Arrow path) may accept the data pages *before* the dictionary page so
the
+ /// data pages can stream straight through, then emit the dictionary page
+ /// first at splice. The offsets recorded during encoding therefore assume
a
+ /// data-pages-first layout; call this with the serialized length of the
+ /// dictionary page to move it to offset 0 and shift every data page after
+ /// it. A `dictionary_len` of 0 (no dictionary page) leaves the result
+ /// unchanged.
+ pub fn update_dictionary_location(mut self, dictionary_len: usize) ->
Result<Self> {
+ if dictionary_len > 0 {
+ self.metadata = self
+ .metadata
+ .into_builder()
+ .set_dictionary_page_offset(Some(0))
+ .set_data_page_offset(dictionary_len as i64)
+ .build()?;
+ if let Some(offset_index) = self.offset_index.as_mut() {
+ let mut offset = dictionary_len as i64;
+ for location in offset_index.page_locations.iter_mut() {
+ location.offset = offset;
+ offset += location.compressed_page_size as i64;
+ }
+ }
+ }
+ Ok(self)
+ }
+}
+
// Metrics per page
#[derive(Default)]
struct PageMetrics {
@@ -685,7 +716,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
/// of the current memory usage and not the final anticipated encoded size.
#[cfg(feature = "arrow")]
pub(crate) fn memory_size(&self) -> usize {
- self.column_metrics.total_bytes_written as usize +
self.encoder.estimated_memory_size()
+ // In-flight encoder buffers, plus any completed pages still held on
the
+ // heap: the dictionary-column data pages buffered here
(column-at-a-time
+ // path), plus whatever the page writer keeps resident. A page writer
+ // that spills completed pages off-heap reports far less than the bytes
+ // it was handed, so this tracks real memory rather than bytes written.
+ self.encoder.estimated_memory_size()
+ + self
+ .data_pages
+ .iter()
+ .map(|page| page.memory_usage())
+ .sum::<usize>()
+ + self.page_writer.buffered_memory_size()
}
/// Returns total number of bytes written by this column writer so far.
@@ -1387,7 +1429,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
};
// Check if we need to buffer data page or flush it to the sink
directly.
- if self.encoder.has_dictionary() {
+ //
+ // For dictionary-encoded columns the dictionary page must be written
+ // first, but it is not final until all values are seen, so completed
+ // data pages are normally buffered here until `close`. A page writer
+ // that defers final layout (the Arrow path) instead orders pages
itself
+ // at flush, so we stream the data pages straight through and never let
+ // them accumulate in memory.
+ if self.encoder.has_dictionary() &&
!self.page_writer.defers_dictionary_ordering() {
self.data_pages.push_back(compressed_page);
} else {
self.write_data_page(compressed_page)?;
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 942013ea62..8ec16ba367 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -684,6 +684,30 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
pub fn append_column<R: ChunkReader>(
&mut self,
reader: &R,
+ close: ColumnCloseResult,
+ ) -> Result<()> {
+ // Position a reader at the start of the buffered chunk, then splice
the
+ // bytes through the shared streaming path.
+ let metadata = &close.metadata;
+ let src_offset = metadata
+ .dictionary_page_offset()
+ .unwrap_or_else(|| metadata.data_page_offset());
+ let read = reader.get_read(src_offset as _)?;
+ self.append_column_from_read(read, close)
+ }
+
+ /// Splice an already-encoded column chunk into the row group, reading its
+ /// bytes sequentially from `read`.
+ ///
+ /// `read` must be positioned at the start of the chunk (the dictionary
page
+ /// if present, otherwise the first data page — i.e. `src_offset` below)
and
+ /// yield exactly the chunk's compressed bytes. Unlike
[`Self::append_column`]
+ /// this consumes an owned [`Read`], which lets the caller stream the bytes
+ /// back from a [`PageStore`](crate::column::page_store::PageStore) one
page
+ /// at a time without materializing the whole chunk in memory.
+ pub(crate) fn append_column_from_read<R: Read>(
+ &mut self,
+ read: R,
mut close: ColumnCloseResult,
) -> Result<()> {
self.assert_previous_writer_closed()?;
@@ -707,7 +731,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
let src_length = metadata.compressed_size();
let write_offset = self.buf.bytes_written();
- let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
+ let mut read = read.take(src_length as _);
let write_length = std::io::copy(&mut read, &mut self.buf)?;
if src_length as u64 != write_length {
diff --git a/parquet/tests/arrow_writer.rs b/parquet/tests/arrow_writer.rs
index 020b4c6267..e4cc101000 100644
--- a/parquet/tests/arrow_writer.rs
+++ b/parquet/tests/arrow_writer.rs
@@ -17,13 +17,22 @@
//! Tests for [`ArrowWriter`]
-use arrow::array::Float64Array;
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow::record_batch::RecordBatch;
+use std::alloc::{GlobalAlloc, Layout, System};
+use std::cell::Cell;
+use std::fs::File;
+use std::io::{Read as _, Seek, SeekFrom, Write as _};
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, BinaryArray, Float64Array, Int32Array,
RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use bytes::Bytes;
use parquet::arrow::ArrowWriter;
+use parquet::arrow::arrow_writer::{
+ ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory,
+};
use parquet::basic::Encoding;
+use parquet::errors::Result;
use parquet::file::properties::WriterProperties;
-use std::sync::Arc;
#[test]
#[should_panic(
@@ -48,3 +57,329 @@ fn test_delta_bit_pack_type() {
let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(),
Some(props)).unwrap();
let _ = writer.write(&record_batch);
}
+
+// ---------------------------------------------------------------------------
+// Heap-memory regression test for the writer's page buffering.
+//
+// This proves the headline invariant of the pluggable [`PageStore`]: while a
+// row group is being written, the heap used to buffer completed pages grows
+// with the row group size for the default in-memory store, but stays bounded
+// (≈ a few pages per leaf column) once a spilling backend is plugged in.
+//
+// Peak heap is measured with a thread-local tracking allocator (the same
+// pattern used by `parquet/benches/arrow_reader_peak_memory.rs`), so the test
+// needs no external profiling dependency. Tracking is thread-local, so the
+// measured peak reflects only allocations made on the measuring thread; the
+// default `ArrowWriter` is single-threaded, so the writer's buffering all
lands
+// there. Each measurement resets the peak to the current live baseline and
+// reports the delta, so the threads of unrelated tests in this binary do not
+// perturb it.
+//
+// [`PageStore`]: parquet::arrow::arrow_writer::PageStore
+// ---------------------------------------------------------------------------
+
+thread_local! {
+ static LIVE_BYTES: Cell<usize> = const { Cell::new(0) };
+ static PEAK_BYTES: Cell<usize> = const { Cell::new(0) };
+}
+
+struct TrackingAllocator {
+ inner: System,
+}
+
+#[global_allocator]
+static GLOBAL: TrackingAllocator = TrackingAllocator { inner: System };
+
+fn add_live_bytes(size: usize) {
+ LIVE_BYTES.with(|live| {
+ let new = live.get().saturating_add(size);
+ live.set(new);
+ PEAK_BYTES.with(|peak| {
+ if new > peak.get() {
+ peak.set(new);
+ }
+ });
+ });
+}
+
+fn subtract_live_bytes(size: usize) {
+ LIVE_BYTES.with(|live| {
+ live.set(live.get().saturating_sub(size));
+ });
+}
+
+#[allow(unsafe_code)]
+unsafe impl GlobalAlloc for TrackingAllocator {
+ unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
+ let ptr = unsafe { self.inner.alloc(layout) };
+ if !ptr.is_null() {
+ add_live_bytes(layout.size());
+ }
+ ptr
+ }
+
+ unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
+ subtract_live_bytes(layout.size());
+ unsafe { self.inner.dealloc(ptr, layout) };
+ }
+
+ unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) ->
*mut u8 {
+ let new_ptr = unsafe { self.inner.realloc(ptr, layout, new_size) };
+ if !new_ptr.is_null() {
+ let old_size = layout.size();
+ if new_size > old_size {
+ add_live_bytes(new_size - old_size);
+ } else {
+ subtract_live_bytes(old_size - new_size);
+ }
+ }
+ new_ptr
+ }
+}
+
+/// Run `f` and return the peak *additional* live heap (bytes) observed on this
+/// thread during it — the delta from the live heap when `f` began.
+fn peak_heap_bytes(f: impl FnOnce()) -> usize {
+ let start = LIVE_BYTES.with(Cell::get);
+ // Reset the peak to the window's baseline so prior allocations don't
count.
+ PEAK_BYTES.with(|peak| peak.set(start));
+ f();
+ PEAK_BYTES.with(Cell::get).saturating_sub(start)
+}
+
+/// Width of each value in the one "fat" column, in bytes.
+const FAT_VALUE_LEN: usize = 4096;
+/// Rows per input batch fed to the writer. Kept small so each batch is dropped
+/// promptly — only the writer's *buffering* should accumulate, not the input.
+const ROWS_PER_BATCH: usize = 64;
+/// Number of batches, all funnelled into a single large row group.
+const NUM_BATCHES: usize = 64;
+/// Total bytes of fat-column payload written (≈ 16 MiB).
+const TOTAL_FAT_BYTES: usize = FAT_VALUE_LEN * ROWS_PER_BATCH * NUM_BATCHES;
+
+/// A wide schema: one fat, high-cardinality binary column (the spill target)
+/// plus several tiny integer columns.
+fn skewed_schema() -> SchemaRef {
+ let mut fields = vec![Field::new("fat", DataType::Binary, false)];
+ for i in 0..8 {
+ fields.push(Field::new(format!("small_{i}"), DataType::Int32, false));
+ }
+ Arc::new(Schema::new(fields))
+}
+
+/// Build one batch of `ROWS_PER_BATCH` rows. The fat column holds unique,
+/// high-entropy values (so they neither dictionary-encode nor compress away),
+/// derived deterministically from `batch_index`.
+fn make_batch(schema: &SchemaRef, batch_index: usize) -> RecordBatch {
+ let mut fat: Vec<u8> = vec![0u8; FAT_VALUE_LEN * ROWS_PER_BATCH];
+ // A cheap xorshift fill keyed by the batch index → distinct,
incompressible.
+ let mut state = (batch_index as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15) |
1;
+ for byte in fat.iter_mut() {
+ state ^= state << 13;
+ state ^= state >> 7;
+ state ^= state << 17;
+ *byte = (state >> 24) as u8;
+ }
+ let offsets: Vec<i32> = (0..=ROWS_PER_BATCH)
+ .map(|i| (i * FAT_VALUE_LEN) as i32)
+ .collect();
+ let fat_array = BinaryArray::try_new(
+ arrow::buffer::OffsetBuffer::new(offsets.into()),
+ arrow::buffer::Buffer::from_vec(fat),
+ None,
+ )
+ .unwrap();
+
+ let mut columns: Vec<ArrayRef> = vec![Arc::new(fat_array)];
+ for c in 0..8 {
+ let vals: Vec<i32> = (0..ROWS_PER_BATCH)
+ .map(|r| (batch_index * ROWS_PER_BATCH + r + c) as i32)
+ .collect();
+ columns.push(Arc::new(Int32Array::from(vals)));
+ }
+ RecordBatch::try_new(schema.clone(), columns).unwrap()
+}
+
+/// Writer properties forcing the whole dataset into a single, uncompressed row
+/// group (so the page buffer is the only thing that grows).
+fn single_row_group_props() -> WriterProperties {
+ WriterProperties::builder()
+ .set_compression(parquet::basic::Compression::UNCOMPRESSED)
+ // One row group for everything: never auto-flush on row count.
+ .set_max_row_group_row_count(Some(ROWS_PER_BATCH * NUM_BATCHES * 2))
+ .build()
+}
+
+/// Write the full skewed dataset with the given writer options, feeding small
+/// batches (each dropped immediately) into one row group.
+///
+/// The output is sent to [`io::sink`] so the produced file bytes never live on
+/// the heap — the measured peak then reflects only the writer's internal page
+/// *buffering*, which is exactly what a [`PageStore`] governs.
+fn write_skewed_dataset(options: ArrowWriterOptions) {
+ let schema = skewed_schema();
+ let mut writer =
+ ArrowWriter::try_new_with_options(std::io::sink(), schema.clone(),
options).unwrap();
+ for b in 0..NUM_BATCHES {
+ let batch = make_batch(&schema, b);
+ writer.write(&batch).unwrap();
+ // `batch` dropped here — only the writer's internal buffering
persists.
+ }
+ writer.close().unwrap();
+}
+
+/// A spilling [`PageStore`]: one temp file per column chunk. `put` appends the
+/// blob and records its `(offset, len)`; `take` seeks and reads it back. The
+/// file is unlinked on creation (via [`tempfile::tempfile`]) so it is cleaned
up
+/// when the store is dropped. This is the canonical "spill completed pages off
+/// the heap" backend the design targets.
+struct TempFilePageStore {
+ file: File,
+ end: u64,
+ locs: Vec<(u64, usize)>,
+}
+
+impl TempFilePageStore {
+ fn new() -> Result<Self> {
+ Ok(Self {
+ file: tempfile::tempfile()?,
+ end: 0,
+ locs: Vec::new(),
+ })
+ }
+}
+
+impl PageStore for TempFilePageStore {
+ fn put(&mut self, value: Bytes) -> Result<PageKey> {
+ // Always append at the logical end (a prior `take` may have moved the
+ // OS file cursor).
+ self.file.seek(SeekFrom::Start(self.end))?;
+ self.file.write_all(&value)?;
+ let key = PageKey::new(self.locs.len() as u64);
+ self.locs.push((self.end, value.len()));
+ self.end += value.len() as u64;
+ Ok(key)
+ }
+
+ fn take(&mut self, key: PageKey) -> Result<Bytes> {
+ let (offset, len) = self.locs[key.get() as usize];
+ let mut buf = vec![0u8; len];
+ self.file.seek(SeekFrom::Start(offset))?;
+ self.file.read_exact(&mut buf)?;
+ Ok(Bytes::from(buf))
+ }
+}
+
+#[derive(Debug, Default)]
+struct TempFilePageStoreFactory;
+
+impl PageStoreFactory for TempFilePageStoreFactory {
+ fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
+ Ok(Box::new(TempFilePageStore::new()?))
+ }
+}
+
+/// Rows per batch / batches for the dictionary-column scenario (~4.2M rows).
+const DICT_ROWS_PER_BATCH: usize = 8192;
+const DICT_NUM_BATCHES: usize = 512;
+
+/// Write a single, low-cardinality (16 distinct values), high-row-count column
+/// as one row group. Such a column stays dictionary-encoded, so its completed
+/// data pages would historically pile up in `GenericColumnWriter` until close
—
+/// the second accumulation point that plain page-buffer spilling does not
reach.
+fn write_dict_dataset(options: ArrowWriterOptions) {
+ let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32,
false)]));
+ let props = WriterProperties::builder()
+ .set_compression(parquet::basic::Compression::UNCOMPRESSED)
+ .set_max_row_group_row_count(Some(DICT_ROWS_PER_BATCH *
DICT_NUM_BATCHES * 2))
+ .build();
+ let options = options.with_properties(props);
+ let mut writer =
+ ArrowWriter::try_new_with_options(std::io::sink(), schema.clone(),
options).unwrap();
+ for b in 0..DICT_NUM_BATCHES {
+ let vals: Vec<i32> = (0..DICT_ROWS_PER_BATCH)
+ .map(|r| ((b + r) % 16) as i32)
+ .collect();
+ let batch =
+ RecordBatch::try_new(schema.clone(),
vec![Arc::new(Int32Array::from(vals))]).unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+}
+
+/// All measurements run in one function so they execute sequentially on a
single
+/// thread — the tracking allocator is thread-local, so running them as
separate
+/// parallel tests would each see only their own thread's allocations (which is
+/// fine), but keeping them together also keeps the in-memory/spill comparison
on
+/// one consistent baseline.
+#[test]
+fn page_store_bounds_write_memory() {
+ let props = single_row_group_props();
+
+ // Baseline: the default in-memory store buffers the whole row group, so
peak
+ // heap is at least the size of the buffered column data.
+ let in_memory_peak = peak_heap_bytes(|| {
+ let opts = ArrowWriterOptions::new().with_properties(props.clone());
+ write_skewed_dataset(opts);
+ });
+
+ // Spilling: the temp-file store keeps completed pages off the heap, so
peak
+ // heap stays bounded by the in-flight encoder/dictionary buffers plus a
page
+ // or two in flight — independent of the row group size.
+ let spill_peak = peak_heap_bytes(|| {
+ let opts = ArrowWriterOptions::new()
+ .with_properties(props.clone())
+ .with_page_store_factory(Arc::new(TempFilePageStoreFactory));
+ write_skewed_dataset(opts);
+ });
+
+ eprintln!(
+ "peak heap — in-memory: {:.1} MiB, temp-file spill: {:.1} MiB (total
fat payload {:.1} MiB)",
+ in_memory_peak as f64 / (1024.0 * 1024.0),
+ spill_peak as f64 / (1024.0 * 1024.0),
+ TOTAL_FAT_BYTES as f64 / (1024.0 * 1024.0),
+ );
+
+ // The in-memory store must hold most of the ~16 MiB of buffered data.
+ let in_memory_floor = TOTAL_FAT_BYTES * 3 / 4;
+ assert!(
+ in_memory_peak >= in_memory_floor,
+ "expected in-memory peak >= {in_memory_floor} bytes, got
{in_memory_peak}"
+ );
+
+ // The spilling store must stay near the per-column bound — roughly
+ // (data_page_size + dict_page_size) per leaf column, ~2 MiB × 9 columns —
+ // and far below the in-memory baseline. We assert a generous 8 MiB ceiling
+ // (well under the ~16 MiB row group) to stay robust across platforms.
+ const SPILL_CEILING: usize = 8 * 1024 * 1024;
+ assert!(
+ spill_peak < SPILL_CEILING,
+ "expected spilling peak < {SPILL_CEILING} bytes (bounded by page/dict
size × columns), \
+ got {spill_peak}"
+ );
+ assert!(
+ spill_peak * 2 < in_memory_peak,
+ "expected spilling peak ({spill_peak}) to be far below the in-memory
baseline \
+ ({in_memory_peak})"
+ );
+
+ // Dictionary-encoded column: completed data pages reach the page writer
(and
+ // thus the store) as they are produced, so spilling bounds them too.
+ let dict_in_memory = peak_heap_bytes(||
write_dict_dataset(ArrowWriterOptions::new()));
+ let dict_spill = peak_heap_bytes(|| {
+ write_dict_dataset(
+
ArrowWriterOptions::new().with_page_store_factory(Arc::new(TempFilePageStoreFactory)),
+ )
+ });
+ eprintln!(
+ "dict column ({} rows) peak heap — in-memory: {:.2} MiB, temp-file
spill: {:.2} MiB",
+ DICT_ROWS_PER_BATCH * DICT_NUM_BATCHES,
+ dict_in_memory as f64 / (1024.0 * 1024.0),
+ dict_spill as f64 / (1024.0 * 1024.0),
+ );
+ assert!(
+ dict_spill * 2 < dict_in_memory,
+ "expected dict-column spilling peak ({dict_spill}) to be far below the
in-memory \
+ baseline ({dict_in_memory}) — dictionary data pages should spill, not
accumulate"
+ );
+}