This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 8042ea288e Pluggable page spilling API for the Parquet ArrowWriter 
(PageStore) (#10020)
8042ea288e is described below

commit 8042ea288e084107b602f9e25a850314942567b6
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Thu Jun 4 12:24:25 2026 -0500

    Pluggable page spilling API for the Parquet ArrowWriter (PageStore) (#10020)
    
    - closes https://github.com/apache/arrow-rs/issues/10071
    
    ## Problem description
    
    We currently buffer entire row groups in memory. From [our own
    
docs](https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#memory-usage-and-limiting):
    
    > The nature of Parquet requires buffering of an entire row group before
    it can be flushed to the underlying writer.
    
    For our production workload where we have ~400 columns with large data
    skews (some much larger than others) this causes >=12GBs of memory
    consumed *just to write Parquet*.
    
    When `ArrowWriter` writes a row group, record batches arrive with all
    columns
    interleaved, but each Parquet column chunk must be contiguous on disk.
    So every
    column's *compressed* pages are buffered for the whole row group and
    only
    spliced into the output at flush. Peak `ArrowWriter` memory is therefore
    ≈ Σ(compressed bytes of every column chunk) for one row group, and it
    grows with
    the row group size.
    
    Today the only lever against this is
    
[ArrowWriter::in_progress_size](https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#method.in_progress_size)
    + flushing
    smaller row groups — which trades away compression and read-time
    (page/row-group)
    pruning. This has negative consequences for encoding efficiency, read
    performance, etc. Parquet already has pages, we don't need one column to
    force the layout of another. Ideally what we'd want in a case like this
    is a large (lets just say 1M row) row group with ~4 1MB pages for the
    `id: i32` column and N ~1MB pages for the `large_text` column. Reading
    the small id column has no data fragmentation penalty, no page index
    bloat penalty, etc.
    
    ## Related issues
    
    Some of the issues I could find
    
    - https://github.com/apache/arrow-rs/issues/5828
    - https://github.com/apache/arrow-rs/issues/5450
    - https://github.com/apache/arrow-rs/issues/5484
    
    ## Proposed solution
    
    Introduce a trait for pluggable buffering. In particular we would like
    to implement spilling (spill buffered completed pages to disk). If this
    works well it can be upstreamed / made easily configurable and usable
    for all arrow users. I am not adding an implementation here to avoid
    discussing those APIs (is it a temp dir, how does it get configured,
    etc.).
    
    ## What changes are included in this PR?
    
    A small, intentionally "dumb" key/value store trait and its wiring, in
    four
    stacked commits:
    
    1. **`PageStore` + `PageKey` + `PageStoreFactory` + `InMemoryPageStore`,
    wired
    into `ArrowWriter`.** The store maps an opaque, store-allocated
    `PageKey` to a
    blob of bytes and knows nothing about pages, dictionaries, ordering, or
    offsets — the caller keeps the handles and decides what they mean. The
    default `InMemoryPageStore` (a `Vec<Bytes>`) adds no overhead and
    produces
    equivalent, valid output — byte-for-byte identical to the previous
    buffering
       for non-dictionary columns; dictionary columns differ only in the
    `page_encoding_stats` ordering (see commit 4). A `PageStoreFactory` is
    threaded
       through `ArrowWriterOptions::with_page_store_factory` →
       `ArrowRowGroupWriterFactory` → `ArrowColumnWriterFactory`.
    2. **Stream column chunks out of the store at splice.** Replaces the
    materialize-then-copy splice with a `Read` that takes each page blob
    back out
    of the store in write order *as it is consumed* and releases it
    immediately,
    so the splice never holds more than one page in memory at a time
    (essential
    for a spilling backend on skewed schemas). `append_column` is unchanged
    for
       external `ChunkReader` callers.
    3. **Public `PageKey::new`/`get`** (so external backends can mint their
    own
    handles) **+ a memory regression test** (in-tree thread-local tracking
       allocator) with a temp-file backend.
    4. **Spill dictionary-column data pages too.** Dictionary-encoded
    columns
    buffered every completed data page in `GenericColumnWriter.data_pages`
    until
    `close()` (the dictionary page must be written first but isn't final
    until all
       values are seen), so those pages never reached the store. A new
    `PageWriter::defers_dictionary_ordering()` lets a writer that buffers
    the
    whole chunk and splices later (the Arrow path) accept data pages
    *before* the
       dictionary page and order them itself; the column writer then streams
    dictionary-column data pages straight through. `ArrowPageWriter` holds
    the
    (bounded, ≤ `dict_page_size_limit`) dictionary page in memory — it now
    arrives
    last — and emits it first at splice, where the production-order page
    offsets
    are rewritten to the dictionary-first layout. Because the data page is
    now
    produced before the dictionary page, the chunk's `page_encoding_stats`
    lists
       the data-page entry before the dictionary-page entry (the reverse of
    `master`); the Parquet spec defines this list as an unordered set, so
    the
       output stays valid. The column-at-a-time
    `SerializedFileWriter` path is unchanged (it commits bytes live and
    still
    buffers, which is inherent there). This commit also fixes
    `memory_size()` to
       report bytes the writer actually holds *resident* (via
    `PageStore::memory_size` / `PageWriter::buffered_memory_size`) rather
    than
       bytes written, so it drops to ~0 once pages are spilled off-heap.
    
    ## Are these changes tested?
    
    Yes:
    
    - A **byte-identical round-trip test** using a custom `PageStore` with
    sparse,
    non-contiguous, `HashMap`-backed handles, proving the writer relies only
    on
    the opaque-handle contract across dictionary and non-dictionary columns
    and
      multiple row groups.
    - A **dictionary round-trip test with the offset index disabled**,
    covering the
    path where only the chunk-level dictionary/data page offsets are
    rewritten.
    - Unit tests for the in-memory backend contract and its resident-byte
    reporting.
    - An **always-on memory regression test**
    (`page_store_bounds_write_memory` in
    `parquet/tests/arrow_writer.rs`, using an in-tree thread-local tracking
    allocator) measuring peak heap, for both a skewed wide row group (~16
    MiB) and
      a low-cardinality dictionary column (~4.2M rows):
    
      | scenario | in-memory store | temp-file spill |
      |---|---|---|
      | skewed ~16 MiB row group | ~18.3 MiB | ~4.2 MiB |
      | dictionary column, 4.2M rows | ~2.69 MiB | ~0.48 MiB |
    
      i.e. the spilling backend bounds peak write memory by the in-flight
    encoder/dictionary buffers rather than the row group size, for both the
    page
      buffer and the dictionary-column data pages.
    
    ## Are there any user-facing changes?
    
    New, additive public API (default behavior unchanged):
    
    - `ArrowWriterOptions::with_page_store_factory`
    - `PageStore`, `PageKey`, `PageStoreFactory`, `InMemoryPageStore`,
    `InMemoryPageStoreFactory` (re-exported from
    `parquet::arrow::arrow_writer`,
      defined in `parquet::column::page_store`).
    - New defaulted `PageWriter` trait methods
    `defers_dictionary_ordering()` and
    `buffered_memory_size()` (both default to the previous behavior), and a
      defaulted `PageStore::memory_size()`.
    
    ## Not covered (by design)
    
    - The **column-at-a-time `SerializedFileWriter` path** still buffers
    dictionary-column data pages: it commits bytes to the file live, so the
    dictionary-first ordering must be resolved during encoding. That path
    already
      has minimal memory otherwise.
    - The in-flight **encoder buffer** and **dictionary** themselves stay
    resident
    (already bounded by the page/dict size limits), as do **bloom filters**.
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ---------
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 parquet/src/arrow/arrow_writer/mod.rs | 439 ++++++++++++++++++++++++++++++----
 parquet/src/column/mod.rs             |   1 +
 parquet/src/column/page.rs            |  58 +++++
 parquet/src/column/page_store.rs      | 257 ++++++++++++++++++++
 parquet/src/column/writer/mod.rs      |  53 +++-
 parquet/src/file/writer.rs            |  26 +-
 parquet/tests/arrow_writer.rs         | 343 +++++++++++++++++++++++++-
 7 files changed, 1127 insertions(+), 50 deletions(-)

diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 86e6cf081f..21650fe26e 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -21,7 +21,6 @@ use crate::column::chunker::ContentDefinedChunker;
 
 use bytes::Bytes;
 use std::io::{Read, Write};
-use std::iter::Peekable;
 use std::slice::Iter;
 use std::sync::{Arc, Mutex};
 use std::vec::IntoIter;
@@ -37,6 +36,7 @@ use super::schema::{add_encoded_arrow_schema_to_metadata, 
decimal_length_from_pr
 
 use crate::arrow::ArrowSchemaConverter;
 use crate::arrow::arrow_writer::byte_array::ByteArrayEncoder;
+use crate::basic::PageType;
 use crate::column::page::{CompressedPage, PageWriteSpec, PageWriter};
 use crate::column::page_encryption::PageEncryptor;
 use crate::column::writer::encoder::ColumnValueEncoder;
@@ -49,7 +49,6 @@ use crate::encryption::encrypt::FileEncryptor;
 use crate::errors::{ParquetError, Result};
 use crate::file::metadata::{KeyValue, ParquetMetaData, RowGroupMetaData};
 use crate::file::properties::{WriterProperties, WriterPropertiesPtr};
-use crate::file::reader::{ChunkReader, Length};
 use crate::file::writer::{SerializedFileWriter, SerializedRowGroupWriter};
 use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr, SchemaDescriptor};
@@ -58,6 +57,12 @@ use levels::{ArrayLevels, calculate_array_levels};
 mod byte_array;
 mod levels;
 
+#[doc(inline)]
+pub use crate::column::page_store::{
+    InMemoryPageStore, InMemoryPageStoreFactory, PageKey, PageStore, 
PageStoreArgs,
+    PageStoreFactory,
+};
+
 /// Encodes [`RecordBatch`] to parquet
 ///
 /// Writes Arrow `RecordBatch`es to a Parquet writer. Multiple [`RecordBatch`] 
will be encoded
@@ -263,8 +268,12 @@ impl<W: Write + Send> ArrowWriter<W> {
         let file_writer =
             SerializedFileWriter::new(writer, schema.root_schema_ptr(), 
Arc::clone(&props_ptr))?;
 
-        let row_group_writer_factory =
+        let mut row_group_writer_factory =
             ArrowRowGroupWriterFactory::new(&file_writer, 
arrow_schema.clone());
+        if let Some(page_store_factory) = options.page_store_factory {
+            row_group_writer_factory =
+                
row_group_writer_factory.with_page_store_factory(page_store_factory);
+        }
 
         let cdc_chunkers = props_ptr
             .content_defined_chunking()
@@ -556,6 +565,7 @@ pub struct ArrowWriterOptions {
     skip_arrow_metadata: bool,
     schema_root: Option<String>,
     schema_descr: Option<SchemaDescriptor>,
+    page_store_factory: Option<Arc<dyn PageStoreFactory>>,
 }
 
 impl ArrowWriterOptions {
@@ -569,6 +579,90 @@ impl ArrowWriterOptions {
         Self { properties, ..self }
     }
 
+    /// Sets the [`PageStoreFactory`] used to buffer completed pages while a 
row
+    /// group is being written.
+    ///
+    /// By default (an [`InMemoryPageStore`] per column chunk) completed pages
+    /// are buffered on the heap until the row group is flushed, so peak memory
+    /// grows with the row group size. Supplying a factory that spills to a 
temp
+    /// file or object storage instead bounds peak write memory, decoupling it
+    /// from the row group size while keeping large, read-optimal row groups.
+    ///
+    /// # Example: a custom [`PageStore`]
+    ///
+    /// A store only has to map an opaque, store-allocated [`PageKey`] to a 
blob
+    /// and hand the blob back once. The keys need not be dense or sequential —
+    /// here a `HashMap`-backed store mints sparse handles, proving the writer
+    /// relies only on the opaque-handle contract. A real spilling backend 
would
+    /// write the bytes to a temp file in `put` and read them back in `take`.
+    ///
+    /// ```
+    /// # use std::collections::HashMap;
+    /// # use std::sync::Arc;
+    /// # use bytes::Bytes;
+    /// # use arrow_array::{ArrayRef, Int64Array, RecordBatch};
+    /// # use parquet::arrow::arrow_writer::{
+    /// #     ArrowWriter, ArrowWriterOptions, PageKey, PageStore, 
PageStoreArgs, PageStoreFactory,
+    /// # };
+    /// # use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
+    /// # use parquet::errors::{ParquetError, Result};
+    /// #[derive(Default)]
+    /// struct MapPageStore {
+    ///     blobs: HashMap<u64, Bytes>,
+    ///     next: u64,
+    /// }
+    ///
+    /// impl PageStore for MapPageStore {
+    ///     fn put(&mut self, value: Bytes) -> Result<PageKey> {
+    ///         // Mint a sparse handle (every other integer) to show the 
writer
+    ///         // never assumes anything about the key's value.
+    ///         let key = PageKey::new(self.next);
+    ///         self.next += 2;
+    ///         self.blobs.insert(key.get(), value);
+    ///         Ok(key)
+    ///     }
+    ///
+    ///     fn take(&mut self, key: PageKey) -> Result<Bytes> {
+    ///         self.blobs
+    ///             .remove(&key.get())
+    ///             .ok_or_else(|| ParquetError::General(format!("invalid key 
{}", key.get())))
+    ///     }
+    /// }
+    ///
+    /// #[derive(Debug)]
+    /// struct MapPageStoreFactory;
+    ///
+    /// impl PageStoreFactory for MapPageStoreFactory {
+    ///     fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn 
PageStore>> {
+    ///         // `args` exposes the column index and descriptor 
(physical/logical
+    ///         // type, path), so a real backend could spill only large 
columns.
+    ///         let _ = (args.column_index(), args.column_descriptor());
+    ///         Ok(Box::new(MapPageStore::default()))
+    ///     }
+    /// }
+    ///
+    /// let col = Arc::new(Int64Array::from_iter_values(0..1000)) as ArrayRef;
+    /// let to_write = RecordBatch::try_from_iter([("col", col)]).unwrap();
+    ///
+    /// let options =
+    ///     
ArrowWriterOptions::new().with_page_store_factory(Arc::new(MapPageStoreFactory));
+    /// let mut buffer = Vec::new();
+    /// let mut writer =
+    ///     ArrowWriter::try_new_with_options(&mut buffer, to_write.schema(), 
options).unwrap();
+    /// writer.write(&to_write).unwrap();
+    /// writer.close().unwrap();
+    ///
+    /// // The file is byte-identical to one written with the default store.
+    /// let mut reader = 
ParquetRecordBatchReader::try_new(Bytes::from(buffer), 1024).unwrap();
+    /// assert_eq!(to_write, reader.next().unwrap().unwrap());
+    /// ```
+    pub fn with_page_store_factory(self, page_store_factory: Arc<dyn 
PageStoreFactory>) -> Self {
+        Self {
+            page_store_factory: Some(page_store_factory),
+            ..self
+        }
+    }
+
     /// Skip encoding the embedded arrow metadata (defaults to `false`)
     ///
     /// Parquet files generated by the [`ArrowWriter`] contain embedded arrow 
schema
@@ -603,52 +697,108 @@ impl ArrowWriterOptions {
     }
 }
 
-/// A single column chunk produced by [`ArrowColumnWriter`]
-#[derive(Default)]
+/// A single column chunk produced by [`ArrowColumnWriter`].
+///
+/// Holds the serialized page blobs (each page's header ‖ compressed data, in
+/// write order) in a [`PageStore`], plus the handles needed to read them back,
+/// in order, when the chunk is spliced into the output file.
 struct ArrowColumnChunkData {
     length: usize,
-    data: Vec<Bytes>,
+    store: Box<dyn PageStore>,
+    keys: Vec<PageKey>,
+    /// The dictionary page's serialized blobs (header ‖ data), held in memory
+    /// rather than the store.
+    ///
+    /// A dictionary page is produced at most once and bounded by
+    /// `dict_page_size_limit`, but it must be written *first* in the chunk 
even
+    /// though the data pages reach the writer before it (see
+    /// [`PageWriter::defers_dictionary_ordering`]). Spilling it would only
+    /// round-trip ~1 page to the backend and straight back, so it is kept here
+    /// and emitted ahead of the data pages at splice. Empty for non-dictionary
+    /// columns.
+    dictionary: Vec<Bytes>,
 }
 
-impl Length for ArrowColumnChunkData {
-    fn len(&self) -> u64 {
-        self.length as _
+impl ArrowColumnChunkData {
+    fn new(store: Box<dyn PageStore>) -> Self {
+        Self {
+            length: 0,
+            store,
+            keys: Vec::new(),
+            dictionary: Vec::new(),
+        }
     }
-}
 
-impl ChunkReader for ArrowColumnChunkData {
-    type T = ArrowColumnChunkReader;
+    /// Append a data-page blob to the store, recording its handle in write
+    /// order.
+    fn push(&mut self, value: Bytes) -> Result<()> {
+        let key = self.store.put(value)?;
+        self.keys.push(key);
+        Ok(())
+    }
 
-    fn get_read(&self, start: u64) -> Result<Self::T> {
-        assert_eq!(start, 0); // Assume append_column writes all data in 
one-shot
-        Ok(ArrowColumnChunkReader(
-            self.data.clone().into_iter().peekable(),
-        ))
+    /// Retain a dictionary-page blob in memory (emitted first at splice).
+    fn push_dictionary(&mut self, value: Bytes) {
+        self.dictionary.push(value);
     }
 
-    fn get_bytes(&self, _start: u64, _length: usize) -> Result<Bytes> {
-        unimplemented!()
+    /// Total serialized size of the in-memory dictionary page, in bytes.
+    fn dictionary_len(&self) -> usize {
+        self.dictionary.iter().map(Bytes::len).sum()
     }
+
+    /// Bytes this chunk currently holds on the heap: whatever the store keeps
+    /// resident (zero for a spilling backend) plus the in-memory dictionary
+    /// page.
+    fn memory_size(&self) -> usize {
+        self.store.memory_size() + self.dictionary_len()
+    }
+}
+
+/// A streaming [`Read`] over one column chunk's buffered pages, in final file
+/// order: the in-memory dictionary page (if any) first, then the data pages.
+///
+/// Each data-page blob is taken back out of the [`PageStore`] *as it is
+/// consumed* and released immediately afterwards, so splicing a chunk into the
+/// output file never materializes more than a single page in memory at a time.
+/// This is what keeps the splice phase within the memory bound for a spilling
+/// backend (an in-memory store already holds the bytes, so it is unaffected).
+struct StreamingColumnChunkReader {
+    /// Dictionary-page blobs, emitted before any data page.
+    dictionary: IntoIter<Bytes>,
+    store: Box<dyn PageStore>,
+    keys: IntoIter<PageKey>,
+    /// The blob currently being drained into the output; emptied as it is 
read.
+    current: Bytes,
 }
 
-/// A [`Read`] for [`ArrowColumnChunkData`]
-struct ArrowColumnChunkReader(Peekable<IntoIter<Bytes>>);
+impl StreamingColumnChunkReader {
+    fn new(data: ArrowColumnChunkData) -> Self {
+        Self {
+            dictionary: data.dictionary.into_iter(),
+            store: data.store,
+            keys: data.keys.into_iter(),
+            current: Bytes::new(),
+        }
+    }
+}
 
-impl Read for ArrowColumnChunkReader {
+impl Read for StreamingColumnChunkReader {
     fn read(&mut self, out: &mut [u8]) -> std::io::Result<usize> {
-        let buffer = loop {
-            match self.0.peek_mut() {
-                Some(b) if b.is_empty() => {
-                    self.0.next();
-                    continue;
-                }
-                Some(b) => break b,
-                None => return Ok(0),
+        // Refill from the next blob whenever the current one is drained: the
+        // dictionary page first, then each data page from the store.
+        while self.current.is_empty() {
+            if let Some(blob) = self.dictionary.next() {
+                self.current = blob;
+            } else if let Some(key) = self.keys.next() {
+                self.current = 
self.store.take(key).map_err(std::io::Error::other)?;
+            } else {
+                return Ok(0);
             }
-        };
+        }
 
-        let len = buffer.len().min(out.len());
-        let b = buffer.split_to(len);
+        let len = self.current.len().min(out.len());
+        let b = self.current.split_to(len);
         out[..len].copy_from_slice(&b);
         Ok(len)
     }
@@ -660,7 +810,6 @@ impl Read for ArrowColumnChunkReader {
 /// [`ArrowRowGroupWriter`] on flush, without requiring self-referential 
borrows
 type SharedColumnChunk = Arc<Mutex<ArrowColumnChunkData>>;
 
-#[derive(Default)]
 struct ArrowPageWriter {
     buffer: SharedColumnChunk,
     #[cfg(feature = "encryption")]
@@ -668,6 +817,15 @@ struct ArrowPageWriter {
 }
 
 impl ArrowPageWriter {
+    /// Create a page writer that buffers completed pages in `store`.
+    fn new(store: Box<dyn PageStore>) -> Self {
+        Self {
+            buffer: Arc::new(Mutex::new(ArrowColumnChunkData::new(store))),
+            #[cfg(feature = "encryption")]
+            page_encryptor: None,
+        }
+    }
+
     #[cfg(feature = "encryption")]
     pub fn with_encryptor(mut self, page_encryptor: Option<PageEncryptor>) -> 
Self {
         self.page_encryptor = page_encryptor;
@@ -726,12 +884,33 @@ impl PageWriter for ArrowPageWriter {
         spec.bytes_written = compressed_size as u64;
 
         buf.length += compressed_size;
-        buf.data.push(header);
-        buf.data.push(data);
+        if spec.page_type == PageType::DICTIONARY_PAGE {
+            // Held in memory and emitted first at splice — see
+            // `ArrowColumnChunkData::dictionary`.
+            buf.push_dictionary(header);
+            buf.push_dictionary(data);
+        } else {
+            buf.push(header)?;
+            buf.push(data)?;
+        }
 
         Ok(spec)
     }
 
+    fn defers_dictionary_ordering(&self) -> bool {
+        // The Arrow chunk is buffered in full and spliced at row-group flush, 
so
+        // data pages may be accepted before the dictionary page and reordered
+        // then. This lets `GenericColumnWriter` stream dictionary-column data
+        // pages straight through instead of buffering them in memory.
+        true
+    }
+
+    fn buffered_memory_size(&self) -> usize {
+        // Only what is actually resident: a spilling store reports ~0 here 
even
+        // though the chunk's bytes have all passed through it.
+        self.buffer.try_lock().unwrap().memory_size()
+    }
+
     fn close(&mut self) -> Result<()> {
         Ok(())
     }
@@ -785,12 +964,21 @@ impl ArrowColumnChunk {
         &mut self.close
     }
 
-    /// Calls [`SerializedRowGroupWriter::append_column`] with this column's 
data
+    /// Splices this column's buffered pages into the row group, streaming them
+    /// back out of the [`PageStore`] one page at a time.
     pub fn append_to_row_group<W: Write + Send>(
         self,
         writer: &mut SerializedRowGroupWriter<'_, W>,
     ) -> Result<()> {
-        writer.append_column(&self.data, self.close)
+        let ArrowColumnChunk { data, close } = self;
+
+        // The dictionary page is produced *after* the data pages on this path 
(so
+        // they can stream straight through) but must be written *first*, so 
move
+        // it ahead of the data pages in the recorded offsets before the 
splice.
+        let close = close.update_dictionary_location(data.dictionary_len())?;
+
+        let reader = StreamingColumnChunkReader::new(data);
+        writer.append_column_from_read(reader, close)
     }
 }
 
@@ -1082,6 +1270,7 @@ pub struct ArrowRowGroupWriterFactory {
     schema: SchemaDescPtr,
     arrow_schema: SchemaRef,
     props: WriterPropertiesPtr,
+    page_store_factory: Arc<dyn PageStoreFactory>,
     #[cfg(feature = "encryption")]
     file_encryptor: Option<Arc<FileEncryptor>>,
 }
@@ -1098,11 +1287,23 @@ impl ArrowRowGroupWriterFactory {
             schema,
             arrow_schema,
             props,
+            page_store_factory: Arc::new(InMemoryPageStoreFactory),
             #[cfg(feature = "encryption")]
             file_encryptor: file_writer.file_encryptor(),
         }
     }
 
+    /// Set the [`PageStoreFactory`] used to allocate the buffer for each 
column
+    /// chunk, e.g. to spill completed pages to a temp file or object storage
+    /// instead of the heap. Defaults to [`InMemoryPageStoreFactory`].
+    pub fn with_page_store_factory(
+        mut self,
+        page_store_factory: Arc<dyn PageStoreFactory>,
+    ) -> Self {
+        self.page_store_factory = page_store_factory;
+        self
+    }
+
     fn create_row_group_writer(&self, row_group_index: usize) -> 
Result<ArrowRowGroupWriter> {
         let writers = self.create_column_writers(row_group_index)?;
         Ok(ArrowRowGroupWriter::new(writers, &self.arrow_schema))
@@ -1127,12 +1328,13 @@ impl ArrowRowGroupWriterFactory {
     #[cfg(feature = "encryption")]
     fn column_writer_factory(&self, row_group_idx: usize) -> 
ArrowColumnWriterFactory {
         ArrowColumnWriterFactory::new()
+            .with_page_store_factory(self.page_store_factory.clone())
             .with_file_encryptor(row_group_idx, self.file_encryptor.clone())
     }
 
     #[cfg(not(feature = "encryption"))]
     fn column_writer_factory(&self, _row_group_idx: usize) -> 
ArrowColumnWriterFactory {
-        ArrowColumnWriterFactory::new()
+        
ArrowColumnWriterFactory::new().with_page_store_factory(self.page_store_factory.clone())
     }
 }
 
@@ -1159,6 +1361,8 @@ pub fn get_column_writers(
 
 /// Creates [`ArrowColumnWriter`] instances
 struct ArrowColumnWriterFactory {
+    /// Allocates the per-column-chunk [`PageStore`] backing each page writer.
+    page_store_factory: Arc<dyn PageStoreFactory>,
     #[cfg(feature = "encryption")]
     row_group_index: usize,
     #[cfg(feature = "encryption")]
@@ -1168,6 +1372,7 @@ struct ArrowColumnWriterFactory {
 impl ArrowColumnWriterFactory {
     pub fn new() -> Self {
         Self {
+            page_store_factory: Arc::new(InMemoryPageStoreFactory),
             #[cfg(feature = "encryption")]
             row_group_index: 0,
             #[cfg(feature = "encryption")]
@@ -1175,6 +1380,15 @@ impl ArrowColumnWriterFactory {
         }
     }
 
+    /// Use `page_store_factory` to allocate the buffer for each column chunk.
+    pub fn with_page_store_factory(
+        mut self,
+        page_store_factory: Arc<dyn PageStoreFactory>,
+    ) -> Self {
+        self.page_store_factory = page_store_factory;
+        self
+    }
+
     #[cfg(feature = "encryption")]
     pub fn with_file_encryptor(
         mut self,
@@ -1199,18 +1413,22 @@ impl ArrowColumnWriterFactory {
             column_index,
             &column_path,
         )?;
+        let args = PageStoreArgs::new(column_index, column_descriptor);
+        let store = self.page_store_factory.create(&args)?;
         Ok(Box::new(
-            ArrowPageWriter::default().with_encryptor(page_encryptor),
+            ArrowPageWriter::new(store).with_encryptor(page_encryptor),
         ))
     }
 
     #[cfg(not(feature = "encryption"))]
     fn create_page_writer(
         &self,
-        _column_descriptor: &ColumnDescPtr,
-        _column_index: usize,
+        column_descriptor: &ColumnDescPtr,
+        column_index: usize,
     ) -> Result<Box<ArrowPageWriter>> {
-        Ok(Box::<ArrowPageWriter>::default())
+        let args = PageStoreArgs::new(column_index, column_descriptor);
+        let store = self.page_store_factory.create(&args)?;
+        Ok(Box::new(ArrowPageWriter::new(store)))
     }
 
     /// Gets an [`ArrowColumnWriter`] for the given `data_type`, appending the
@@ -1738,6 +1956,141 @@ mod tests {
         statistics::Statistics,
     };
 
+    /// A [`PageStore`] that allocates *sparse, non-contiguous* handles and 
keeps
+    /// blobs in a `HashMap` — nothing like the default `Vec<Bytes>`. Used to
+    /// prove the writer relies only on the opaque-handle contract and never on
+    /// handles being dense `Vec` indices. Records how many blobs were stored.
+    #[derive(Debug, Default)]
+    struct RecordingPageStore {
+        next: u64,
+        blobs: HashMap<u64, Bytes>,
+        puts: Arc<std::sync::atomic::AtomicUsize>,
+    }
+
+    impl PageStore for RecordingPageStore {
+        fn put(&mut self, value: Bytes) -> Result<PageKey> {
+            // Deliberately non-sequential, never-zero handles.
+            let id = 100 + self.next * 7;
+            self.next += 1;
+            self.puts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+            self.blobs.insert(id, value);
+            Ok(PageKey::new(id))
+        }
+
+        fn take(&mut self, key: PageKey) -> Result<Bytes> {
+            self.blobs
+                .remove(&key.get())
+                .ok_or_else(|| ParquetError::General(format!("missing key {}", 
key.get())))
+        }
+    }
+
+    #[derive(Debug)]
+    struct RecordingPageStoreFactory {
+        puts: Arc<std::sync::atomic::AtomicUsize>,
+    }
+
+    impl PageStoreFactory for RecordingPageStoreFactory {
+        fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn 
PageStore>> {
+            Ok(Box::new(RecordingPageStore {
+                puts: self.puts.clone(),
+                ..Default::default()
+            }))
+        }
+    }
+
+    /// A custom [`PageStore`] must produce byte-identical files to the 
in-memory
+    /// default, across dictionary and non-dictionary columns and multiple row
+    /// groups (so multiple store instances are exercised).
+    #[test]
+    fn custom_page_store_is_byte_identical_to_default() {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("i", DataType::Int32, true),
+            // A low-cardinality string column to exercise the dictionary path.
+            Field::new("s", DataType::Utf8, true),
+        ]));
+        let i = Int32Array::from(vec![Some(1), None, Some(3), Some(4), 
Some(5), Some(6)]);
+        let s = StringArray::from(vec![
+            Some("a"),
+            Some("bb"),
+            Some("a"),
+            None,
+            Some("bb"),
+            Some("ccc"),
+        ]);
+        let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(i), 
Arc::new(s)]).unwrap();
+
+        // Small row groups so multiple column chunks (hence multiple store
+        // instances) are produced.
+        let props = WriterProperties::builder()
+            .set_max_row_group_row_count(Some(3))
+            .build();
+
+        let write = |factory: Option<Arc<dyn PageStoreFactory>>| {
+            let mut buffer = Vec::new();
+            let mut opts = 
ArrowWriterOptions::new().with_properties(props.clone());
+            if let Some(factory) = factory {
+                opts = opts.with_page_store_factory(factory);
+            }
+            let mut writer =
+                ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), 
opts).unwrap();
+            writer.write(&batch).unwrap();
+            writer.close().unwrap();
+            buffer
+        };
+
+        let default_bytes = write(None);
+
+        let puts = Arc::new(std::sync::atomic::AtomicUsize::new(0));
+        let custom_bytes = write(Some(Arc::new(RecordingPageStoreFactory {
+            puts: puts.clone(),
+        })));
+
+        assert!(
+            puts.load(std::sync::atomic::Ordering::Relaxed) > 0,
+            "custom PageStore was never written to"
+        );
+        assert_eq!(
+            default_bytes, custom_bytes,
+            "a custom PageStore must produce byte-identical output to the 
default"
+        );
+    }
+
+    /// A dictionary-encoded column written through the deferred-ordering Arrow
+    /// path must round-trip correctly even with the offset index disabled, 
when
+    /// only the chunk-level dictionary/data page offsets are rewritten (there 
is
+    /// no offset index to rebuild). Spans multiple data pages so the
+    /// dictionary-first reordering is exercised.
+    #[test]
+    fn dictionary_column_round_trips_with_offset_index_disabled() {
+        let schema = Arc::new(Schema::new(vec![Field::new("k", 
DataType::Int32, true)]));
+
+        // Low cardinality so the column stays dictionary-encoded; enough rows 
to
+        // span several data pages within a single row group.
+        let values: Vec<Option<i32>> = (0..50_000).map(|i| Some(i % 
8)).collect();
+        let array = Int32Array::from(values.clone());
+        let batch = RecordBatch::try_new(schema.clone(), 
vec![Arc::new(array)]).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_offset_index_disabled(true)
+            .set_data_page_row_count_limit(4096)
+            .build();
+        let opts = ArrowWriterOptions::new().with_properties(props);
+
+        let mut buffer = Vec::new();
+        let mut writer =
+            ArrowWriter::try_new_with_options(&mut buffer, schema.clone(), 
opts).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        let reader = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 
values.len()).unwrap();
+        let read: Vec<RecordBatch> = 
reader.collect::<ArrowResult<_>>().unwrap();
+        let read_values: Vec<Option<i32>> = read
+            .iter()
+            .flat_map(|b| b.column(0).as_primitive::<Int32Type>().iter())
+            .collect();
+        assert_eq!(read_values, values);
+    }
+
     #[test]
     fn arrow_writer() {
         // define schema
diff --git a/parquet/src/column/mod.rs b/parquet/src/column/mod.rs
index 115c8dd01b..9c7e77d29c 100644
--- a/parquet/src/column/mod.rs
+++ b/parquet/src/column/mod.rs
@@ -125,5 +125,6 @@ pub(crate) mod page_encryption;
 #[cfg(not(feature = "encryption"))]
 #[path = "page_encryption_disabled.rs"]
 pub(crate) mod page_encryption;
+pub mod page_store;
 pub mod reader;
 pub mod writer;
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 4cfc07a028..ed80e279a0 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -197,6 +197,15 @@ impl CompressedPage {
         self.compressed_page.buffer()
     }
 
+    /// Returns the number of heap bytes this page currently holds.
+    ///
+    /// This is the page's compressed buffer (the embedded [`Bytes`]); use it 
to
+    /// account for a buffered page's memory footprint rather than reaching for
+    /// `data().len()` at each call site.
+    pub fn memory_usage(&self) -> usize {
+        self.compressed_page.buffer().len()
+    }
+
     /// Returns the thrift page header
     pub(crate) fn to_thrift_header(&self) -> Result<PageHeader> {
         let uncompressed_size = self.uncompressed_size();
@@ -430,6 +439,55 @@ pub trait PageWriter: Send {
     /// either data page or dictionary page.
     fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
 
+    /// **Unstable, not public API.** This is an internal protocol between
+    /// [`GenericColumnWriter`] and its in-crate page writers; it is hidden 
from
+    /// the rendered docs and may change or be removed without a major version
+    /// bump. External `PageWriter` implementations should not override it. See
+    /// the page-spilling cleanup tracked in
+    /// <https://github.com/apache/arrow-rs/pull/10020>.
+    ///
+    /// Whether this writer resolves the final page layout itself (at flush)
+    /// rather than committing bytes to their final position as pages arrive.
+    ///
+    /// The dictionary page of a column chunk must be written *first*, but it 
is
+    /// not finalized until every value has been seen. A writer that commits
+    /// bytes live (e.g. straight to a file) therefore relies on the column
+    /// writer buffering the dictionary-encoded data pages in memory until the
+    /// dictionary page is ready — see [`GenericColumnWriter`]'s `data_pages`.
+    ///
+    /// A writer that instead buffers the whole chunk and splices it later (the
+    /// [`ArrowWriter`] path) can accept data pages *before* the dictionary 
page
+    /// and order them itself at flush. Returning `true` tells the column 
writer
+    /// to skip that in-memory buffering and stream dictionary-column data 
pages
+    /// straight through, bounding the column writer's memory.
+    ///
+    /// [`GenericColumnWriter`]: crate::column::writer::GenericColumnWriter
+    /// [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+    #[doc(hidden)]
+    fn defers_dictionary_ordering(&self) -> bool {
+        false
+    }
+
+    /// **Unstable, not public API.** Companion to
+    /// [`defers_dictionary_ordering`](Self::defers_dictionary_ordering): an
+    /// internal hook for the column writer's memory accounting, hidden from 
the
+    /// rendered docs and subject to change or removal without a major version
+    /// bump. External `PageWriter` implementations should not override it.
+    ///
+    /// The number of bytes this writer is currently holding **in memory** for
+    /// pages it has been handed (i.e. completed pages not yet committed to 
their
+    /// final destination).
+    ///
+    /// Used by the column writer to report its memory footprint. The default 
is
+    /// `0`: a writer that streams pages straight to their destination retains
+    /// nothing. A writer that buffers pages should report what it actually 
holds
+    /// on the heap — which, when it spills to a backing store, can be far less
+    /// than the bytes written.
+    #[doc(hidden)]
+    fn buffered_memory_size(&self) -> usize {
+        0
+    }
+
     /// Closes resources and flushes underlying sink.
     /// Page writer should not be used after this method is called.
     fn close(&mut self) -> Result<()>;
diff --git a/parquet/src/column/page_store.rs b/parquet/src/column/page_store.rs
new file mode 100644
index 0000000000..4f821c0e5c
--- /dev/null
+++ b/parquet/src/column/page_store.rs
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Pluggable storage for completed, serialized page blobs.
+//!
+//! While a row group is being written the [`ArrowWriter`] must buffer every
+//! column's encoded pages, because Parquet requires each column chunk to be
+//! contiguous in the file while record batches arrive with all columns 
interleaved.
+//! By default that buffer lives on the heap, so the writer's peak memory grows
+//! with the row group size. A [`PageStore`] lets the buffer live somewhere 
else
+//! — a local temp file, object storage, etc. — bounding peak write memory
+//! independently of the row group size.
+//!
+//! [`ArrowWriter`]: crate::arrow::arrow_writer::ArrowWriter
+
+use std::fmt::Debug;
+
+use bytes::Bytes;
+
+use crate::errors::{ParquetError, Result};
+use crate::schema::types::ColumnDescriptor;
+
+/// An opaque, store-allocated handle to a blob held by a [`PageStore`].
+///
+/// Handles are allocated by the store — densely and sequentially — and are 
only
+/// meaningful to the store that produced them. The caller treats them as 
opaque
+/// tokens.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
+pub struct PageKey(u64);
+
+impl PageKey {
+    /// Create a handle wrapping `raw`.
+    ///
+    /// A [`PageStore`] implementation calls this to mint the handle it returns
+    /// from [`put`](PageStore::put). The value is opaque to the caller, so a
+    /// store is free to use a dense counter, a packed locator, or anything 
else
+    /// it can later resolve in [`take`](PageStore::take).
+    pub const fn new(raw: u64) -> Self {
+        Self(raw)
+    }
+
+    /// The raw value passed to [`new`](Self::new).
+    pub const fn get(self) -> u64 {
+        self.0
+    }
+}
+
+/// A pluggable store for completed, serialized page blobs.
+///
+/// The store is intentionally "dumb": it only maps an opaque [`PageKey`] to a
+/// blob of bytes. It knows nothing about pages, dictionaries, ordering, or
+/// offsets. The caller keeps the handles it gets back from [`put`](Self::put)
+/// and decides what they mean.
+///
+/// Each store instance is owned by a single column writer and mutated by one
+/// thread at a time (both methods take `&mut self`), so it needs no internal
+/// synchronization — hence only `Send`, not `Sync`.
+///
+/// The default ([`InMemoryPageStore`]) keeps blobs on the heap. Configure a
+/// different backend via
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStore: Send {
+    /// Store `value`, returning a handle that can later be passed to
+    /// [`take`](Self::take).
+    fn put(&mut self, value: Bytes) -> Result<PageKey>;
+
+    /// Take back the blob previously stored under `key`.
+    ///
+    /// The caller takes ownership of the returned bytes and will **not** 
request
+    /// `key` again, so the store may release any resources backing it — 
eagerly
+    /// here, or when the store is dropped.
+    fn take(&mut self, key: PageKey) -> Result<Bytes>;
+
+    /// The number of bytes this store currently holds **in memory** (resident
+    /// on the heap), used to report the writer's memory footprint.
+    ///
+    /// The default is `0`, which is exactly right for a backend that moves
+    /// every blob off-heap (a temp file, object storage): the bytes it has 
been
+    /// handed no longer occupy heap. The in-memory backend overrides this to
+    /// report its resident blobs. A backend that keeps a partial in-memory
+    /// buffer should report that buffer's size.
+    fn memory_size(&self) -> usize {
+        0
+    }
+}
+
+/// Context for a single [`PageStoreFactory::create`] call.
+///
+/// Describes the leaf column chunk the store will buffer. It is held by
+/// reference for the duration of the call; a backend reads only what it needs.
+/// More fields may be added in future releases without breaking existing
+/// implementations — the type is constructed only by the writer, so an
+/// implementer only ever receives one and calls its accessors.
+pub struct PageStoreArgs<'a> {
+    column_index: usize,
+    column_descriptor: &'a ColumnDescriptor,
+}
+
+impl<'a> PageStoreArgs<'a> {
+    // Constructed only by the Arrow writer; without that feature there is no 
caller.
+    #[cfg(feature = "arrow")]
+    pub(crate) fn new(column_index: usize, column_descriptor: &'a 
ColumnDescriptor) -> Self {
+        Self {
+            column_index,
+            column_descriptor,
+        }
+    }
+
+    /// Index of the leaf column within the row group.
+    ///
+    /// A backend may use this to e.g. name spill files or shard across a 
bounded
+    /// pool; it carries no ordering or coordination requirement.
+    pub fn column_index(&self) -> usize {
+        self.column_index
+    }
+
+    /// Descriptor for the leaf column: physical/logical type, path, and max
+    /// definition/repetition levels.
+    ///
+    /// Lets a backend tailor buffering to the column — for example spilling 
only
+    /// large `BYTE_ARRAY` columns while keeping small fixed-width ones on the
+    /// heap.
+    pub fn column_descriptor(&self) -> &ColumnDescriptor {
+        self.column_descriptor
+    }
+}
+
+/// Creates a fresh [`PageStore`] for each column chunk.
+///
+/// See
+/// 
[`ArrowWriterOptions::with_page_store_factory`](crate::arrow::arrow_writer::ArrowWriterOptions::with_page_store_factory).
+pub trait PageStoreFactory: Send + Sync + Debug {
+    /// Create a new, empty [`PageStore`] for the leaf column described by 
`args`.
+    fn create(&self, args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>>;
+}
+
+/// The default [`PageStore`], holding blobs on the heap in a `Vec<Bytes>`.
+///
+/// Peak memory grows with the row group size; use a spilling backend to bound
+/// it.
+#[derive(Debug, Default)]
+pub struct InMemoryPageStore {
+    blobs: Vec<Bytes>,
+    /// Running total of resident blob bytes, kept in step with `put`/`take`.
+    resident: usize,
+}
+
+impl PageStore for InMemoryPageStore {
+    fn put(&mut self, value: Bytes) -> Result<PageKey> {
+        let key = PageKey(self.blobs.len() as u64);
+        self.resident += value.len();
+        self.blobs.push(value);
+        Ok(key)
+    }
+
+    fn take(&mut self, key: PageKey) -> Result<Bytes> {
+        // Replace the slot with an empty `Bytes` so the stored blob is 
released
+        // as soon as it is taken, keeping memory bounded while the chunk is
+        // streamed into the output file.
+        let blob = self
+            .blobs
+            .get_mut(key.0 as usize)
+            .map(std::mem::take)
+            .ok_or_else(|| ParquetError::General(format!("invalid page key 
{}", key.0)))?;
+        self.resident -= blob.len();
+        Ok(blob)
+    }
+
+    fn memory_size(&self) -> usize {
+        self.resident
+    }
+}
+
+/// Factory for [`InMemoryPageStore`] — the default used by
+/// [`ArrowWriter`](crate::arrow::arrow_writer::ArrowWriter).
+#[derive(Debug, Default)]
+pub struct InMemoryPageStoreFactory;
+
+impl PageStoreFactory for InMemoryPageStoreFactory {
+    fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
+        Ok(Box::new(InMemoryPageStore::default()))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn in_memory_round_trips_blobs_in_handle_order() {
+        let mut store = InMemoryPageStore::default();
+        let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
+        let k1 = store.put(Bytes::from_static(b"world")).unwrap();
+        assert_ne!(k0, k1);
+        assert_eq!(&store.take(k0).unwrap()[..], b"hello");
+        assert_eq!(&store.take(k1).unwrap()[..], b"world");
+    }
+
+    #[test]
+    fn in_memory_take_releases_the_slot() {
+        let mut store = InMemoryPageStore::default();
+        let k = store.put(Bytes::from_static(b"abc")).unwrap();
+        assert_eq!(&store.take(k).unwrap()[..], b"abc");
+        // A second take yields the emptied placeholder rather than the blob,
+        // confirming the bytes were released on the first take.
+        assert!(store.take(k).unwrap().is_empty());
+    }
+
+    #[test]
+    fn in_memory_invalid_key_errors() {
+        let mut store = InMemoryPageStore::default();
+        assert!(store.take(PageKey(99)).is_err());
+    }
+
+    #[test]
+    fn in_memory_reports_resident_bytes() {
+        let mut store = InMemoryPageStore::default();
+        assert_eq!(store.memory_size(), 0);
+        let k0 = store.put(Bytes::from_static(b"hello")).unwrap();
+        let k1 = store.put(Bytes::from_static(b"!")).unwrap();
+        assert_eq!(store.memory_size(), 6);
+        store.take(k0).unwrap();
+        assert_eq!(store.memory_size(), 1);
+        store.take(k1).unwrap();
+        assert_eq!(store.memory_size(), 0);
+    }
+
+    #[test]
+    fn default_store_memory_size_is_zero() {
+        // A spilling backend that does not override `memory_size` reports 0,
+        // reflecting that its blobs no longer occupy the heap.
+        struct OffHeap;
+        impl PageStore for OffHeap {
+            fn put(&mut self, _value: Bytes) -> Result<PageKey> {
+                Ok(PageKey::new(0))
+            }
+            fn take(&mut self, _key: PageKey) -> Result<Bytes> {
+                Ok(Bytes::new())
+            }
+        }
+        assert_eq!(OffHeap.memory_size(), 0);
+    }
+}
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 090350f53a..aa9cef16c5 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -209,6 +209,37 @@ pub struct ColumnCloseResult {
     pub offset_index: Option<OffsetIndexMetaData>,
 }
 
+impl ColumnCloseResult {
+    /// Rewrite the page offsets for a dictionary-first on-disk layout.
+    ///
+    /// A writer that buffers the whole column chunk and splices it later (the
+    /// Arrow path) may accept the data pages *before* the dictionary page so 
the
+    /// data pages can stream straight through, then emit the dictionary page
+    /// first at splice. The offsets recorded during encoding therefore assume 
a
+    /// data-pages-first layout; call this with the serialized length of the
+    /// dictionary page to move it to offset 0 and shift every data page after
+    /// it. A `dictionary_len` of 0 (no dictionary page) leaves the result
+    /// unchanged.
+    pub fn update_dictionary_location(mut self, dictionary_len: usize) -> 
Result<Self> {
+        if dictionary_len > 0 {
+            self.metadata = self
+                .metadata
+                .into_builder()
+                .set_dictionary_page_offset(Some(0))
+                .set_data_page_offset(dictionary_len as i64)
+                .build()?;
+            if let Some(offset_index) = self.offset_index.as_mut() {
+                let mut offset = dictionary_len as i64;
+                for location in offset_index.page_locations.iter_mut() {
+                    location.offset = offset;
+                    offset += location.compressed_page_size as i64;
+                }
+            }
+        }
+        Ok(self)
+    }
+}
+
 // Metrics per page
 #[derive(Default)]
 struct PageMetrics {
@@ -685,7 +716,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
     /// of the current memory usage and not the final anticipated encoded size.
     #[cfg(feature = "arrow")]
     pub(crate) fn memory_size(&self) -> usize {
-        self.column_metrics.total_bytes_written as usize + 
self.encoder.estimated_memory_size()
+        // In-flight encoder buffers, plus any completed pages still held on 
the
+        // heap: the dictionary-column data pages buffered here 
(column-at-a-time
+        // path), plus whatever the page writer keeps resident. A page writer
+        // that spills completed pages off-heap reports far less than the bytes
+        // it was handed, so this tracks real memory rather than bytes written.
+        self.encoder.estimated_memory_size()
+            + self
+                .data_pages
+                .iter()
+                .map(|page| page.memory_usage())
+                .sum::<usize>()
+            + self.page_writer.buffered_memory_size()
     }
 
     /// Returns total number of bytes written by this column writer so far.
@@ -1387,7 +1429,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
         };
 
         // Check if we need to buffer data page or flush it to the sink 
directly.
-        if self.encoder.has_dictionary() {
+        //
+        // For dictionary-encoded columns the dictionary page must be written
+        // first, but it is not final until all values are seen, so completed
+        // data pages are normally buffered here until `close`. A page writer
+        // that defers final layout (the Arrow path) instead orders pages 
itself
+        // at flush, so we stream the data pages straight through and never let
+        // them accumulate in memory.
+        if self.encoder.has_dictionary() && 
!self.page_writer.defers_dictionary_ordering() {
             self.data_pages.push_back(compressed_page);
         } else {
             self.write_data_page(compressed_page)?;
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 942013ea62..8ec16ba367 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -684,6 +684,30 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
     pub fn append_column<R: ChunkReader>(
         &mut self,
         reader: &R,
+        close: ColumnCloseResult,
+    ) -> Result<()> {
+        // Position a reader at the start of the buffered chunk, then splice 
the
+        // bytes through the shared streaming path.
+        let metadata = &close.metadata;
+        let src_offset = metadata
+            .dictionary_page_offset()
+            .unwrap_or_else(|| metadata.data_page_offset());
+        let read = reader.get_read(src_offset as _)?;
+        self.append_column_from_read(read, close)
+    }
+
+    /// Splice an already-encoded column chunk into the row group, reading its
+    /// bytes sequentially from `read`.
+    ///
+    /// `read` must be positioned at the start of the chunk (the dictionary 
page
+    /// if present, otherwise the first data page — i.e. `src_offset` below) 
and
+    /// yield exactly the chunk's compressed bytes. Unlike 
[`Self::append_column`]
+    /// this consumes an owned [`Read`], which lets the caller stream the bytes
+    /// back from a [`PageStore`](crate::column::page_store::PageStore) one 
page
+    /// at a time without materializing the whole chunk in memory.
+    pub(crate) fn append_column_from_read<R: Read>(
+        &mut self,
+        read: R,
         mut close: ColumnCloseResult,
     ) -> Result<()> {
         self.assert_previous_writer_closed()?;
@@ -707,7 +731,7 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
         let src_length = metadata.compressed_size();
 
         let write_offset = self.buf.bytes_written();
-        let mut read = reader.get_read(src_offset as _)?.take(src_length as _);
+        let mut read = read.take(src_length as _);
         let write_length = std::io::copy(&mut read, &mut self.buf)?;
 
         if src_length as u64 != write_length {
diff --git a/parquet/tests/arrow_writer.rs b/parquet/tests/arrow_writer.rs
index 020b4c6267..e4cc101000 100644
--- a/parquet/tests/arrow_writer.rs
+++ b/parquet/tests/arrow_writer.rs
@@ -17,13 +17,22 @@
 
 //! Tests for [`ArrowWriter`]
 
-use arrow::array::Float64Array;
-use arrow::datatypes::{DataType, Field, Schema};
-use arrow::record_batch::RecordBatch;
+use std::alloc::{GlobalAlloc, Layout, System};
+use std::cell::Cell;
+use std::fs::File;
+use std::io::{Read as _, Seek, SeekFrom, Write as _};
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, BinaryArray, Float64Array, Int32Array, 
RecordBatch};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use bytes::Bytes;
 use parquet::arrow::ArrowWriter;
+use parquet::arrow::arrow_writer::{
+    ArrowWriterOptions, PageKey, PageStore, PageStoreArgs, PageStoreFactory,
+};
 use parquet::basic::Encoding;
+use parquet::errors::Result;
 use parquet::file::properties::WriterProperties;
-use std::sync::Arc;
 
 #[test]
 #[should_panic(
@@ -48,3 +57,329 @@ fn test_delta_bit_pack_type() {
     let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), 
Some(props)).unwrap();
     let _ = writer.write(&record_batch);
 }
+
+// ---------------------------------------------------------------------------
+// Heap-memory regression test for the writer's page buffering.
+//
+// This proves the headline invariant of the pluggable [`PageStore`]: while a
+// row group is being written, the heap used to buffer completed pages grows
+// with the row group size for the default in-memory store, but stays bounded
+// (≈ a few pages per leaf column) once a spilling backend is plugged in.
+//
+// Peak heap is measured with a thread-local tracking allocator (the same
+// pattern used by `parquet/benches/arrow_reader_peak_memory.rs`), so the test
+// needs no external profiling dependency. Tracking is thread-local, so the
+// measured peak reflects only allocations made on the measuring thread; the
+// default `ArrowWriter` is single-threaded, so the writer's buffering all 
lands
+// there. Each measurement resets the peak to the current live baseline and
+// reports the delta, so the threads of unrelated tests in this binary do not
+// perturb it.
+//
+// [`PageStore`]: parquet::arrow::arrow_writer::PageStore
+// ---------------------------------------------------------------------------
+
+thread_local! {
+    static LIVE_BYTES: Cell<usize> = const { Cell::new(0) };
+    static PEAK_BYTES: Cell<usize> = const { Cell::new(0) };
+}
+
+struct TrackingAllocator {
+    inner: System,
+}
+
+#[global_allocator]
+static GLOBAL: TrackingAllocator = TrackingAllocator { inner: System };
+
+fn add_live_bytes(size: usize) {
+    LIVE_BYTES.with(|live| {
+        let new = live.get().saturating_add(size);
+        live.set(new);
+        PEAK_BYTES.with(|peak| {
+            if new > peak.get() {
+                peak.set(new);
+            }
+        });
+    });
+}
+
+fn subtract_live_bytes(size: usize) {
+    LIVE_BYTES.with(|live| {
+        live.set(live.get().saturating_sub(size));
+    });
+}
+
+#[allow(unsafe_code)]
+unsafe impl GlobalAlloc for TrackingAllocator {
+    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
+        let ptr = unsafe { self.inner.alloc(layout) };
+        if !ptr.is_null() {
+            add_live_bytes(layout.size());
+        }
+        ptr
+    }
+
+    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
+        subtract_live_bytes(layout.size());
+        unsafe { self.inner.dealloc(ptr, layout) };
+    }
+
+    unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> 
*mut u8 {
+        let new_ptr = unsafe { self.inner.realloc(ptr, layout, new_size) };
+        if !new_ptr.is_null() {
+            let old_size = layout.size();
+            if new_size > old_size {
+                add_live_bytes(new_size - old_size);
+            } else {
+                subtract_live_bytes(old_size - new_size);
+            }
+        }
+        new_ptr
+    }
+}
+
+/// Run `f` and return the peak *additional* live heap (bytes) observed on this
+/// thread during it — the delta from the live heap when `f` began.
+fn peak_heap_bytes(f: impl FnOnce()) -> usize {
+    let start = LIVE_BYTES.with(Cell::get);
+    // Reset the peak to the window's baseline so prior allocations don't 
count.
+    PEAK_BYTES.with(|peak| peak.set(start));
+    f();
+    PEAK_BYTES.with(Cell::get).saturating_sub(start)
+}
+
+/// Width of each value in the one "fat" column, in bytes.
+const FAT_VALUE_LEN: usize = 4096;
+/// Rows per input batch fed to the writer. Kept small so each batch is dropped
+/// promptly — only the writer's *buffering* should accumulate, not the input.
+const ROWS_PER_BATCH: usize = 64;
+/// Number of batches, all funnelled into a single large row group.
+const NUM_BATCHES: usize = 64;
+/// Total bytes of fat-column payload written (≈ 16 MiB).
+const TOTAL_FAT_BYTES: usize = FAT_VALUE_LEN * ROWS_PER_BATCH * NUM_BATCHES;
+
+/// A wide schema: one fat, high-cardinality binary column (the spill target)
+/// plus several tiny integer columns.
+fn skewed_schema() -> SchemaRef {
+    let mut fields = vec![Field::new("fat", DataType::Binary, false)];
+    for i in 0..8 {
+        fields.push(Field::new(format!("small_{i}"), DataType::Int32, false));
+    }
+    Arc::new(Schema::new(fields))
+}
+
+/// Build one batch of `ROWS_PER_BATCH` rows. The fat column holds unique,
+/// high-entropy values (so they neither dictionary-encode nor compress away),
+/// derived deterministically from `batch_index`.
+fn make_batch(schema: &SchemaRef, batch_index: usize) -> RecordBatch {
+    let mut fat: Vec<u8> = vec![0u8; FAT_VALUE_LEN * ROWS_PER_BATCH];
+    // A cheap xorshift fill keyed by the batch index → distinct, 
incompressible.
+    let mut state = (batch_index as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15) | 
1;
+    for byte in fat.iter_mut() {
+        state ^= state << 13;
+        state ^= state >> 7;
+        state ^= state << 17;
+        *byte = (state >> 24) as u8;
+    }
+    let offsets: Vec<i32> = (0..=ROWS_PER_BATCH)
+        .map(|i| (i * FAT_VALUE_LEN) as i32)
+        .collect();
+    let fat_array = BinaryArray::try_new(
+        arrow::buffer::OffsetBuffer::new(offsets.into()),
+        arrow::buffer::Buffer::from_vec(fat),
+        None,
+    )
+    .unwrap();
+
+    let mut columns: Vec<ArrayRef> = vec![Arc::new(fat_array)];
+    for c in 0..8 {
+        let vals: Vec<i32> = (0..ROWS_PER_BATCH)
+            .map(|r| (batch_index * ROWS_PER_BATCH + r + c) as i32)
+            .collect();
+        columns.push(Arc::new(Int32Array::from(vals)));
+    }
+    RecordBatch::try_new(schema.clone(), columns).unwrap()
+}
+
+/// Writer properties forcing the whole dataset into a single, uncompressed row
+/// group (so the page buffer is the only thing that grows).
+fn single_row_group_props() -> WriterProperties {
+    WriterProperties::builder()
+        .set_compression(parquet::basic::Compression::UNCOMPRESSED)
+        // One row group for everything: never auto-flush on row count.
+        .set_max_row_group_row_count(Some(ROWS_PER_BATCH * NUM_BATCHES * 2))
+        .build()
+}
+
+/// Write the full skewed dataset with the given writer options, feeding small
+/// batches (each dropped immediately) into one row group.
+///
+/// The output is sent to [`io::sink`] so the produced file bytes never live on
+/// the heap — the measured peak then reflects only the writer's internal page
+/// *buffering*, which is exactly what a [`PageStore`] governs.
+fn write_skewed_dataset(options: ArrowWriterOptions) {
+    let schema = skewed_schema();
+    let mut writer =
+        ArrowWriter::try_new_with_options(std::io::sink(), schema.clone(), 
options).unwrap();
+    for b in 0..NUM_BATCHES {
+        let batch = make_batch(&schema, b);
+        writer.write(&batch).unwrap();
+        // `batch` dropped here — only the writer's internal buffering 
persists.
+    }
+    writer.close().unwrap();
+}
+
+/// A spilling [`PageStore`]: one temp file per column chunk. `put` appends the
+/// blob and records its `(offset, len)`; `take` seeks and reads it back. The
+/// file is unlinked on creation (via [`tempfile::tempfile`]) so it is cleaned 
up
+/// when the store is dropped. This is the canonical "spill completed pages off
+/// the heap" backend the design targets.
+struct TempFilePageStore {
+    file: File,
+    end: u64,
+    locs: Vec<(u64, usize)>,
+}
+
+impl TempFilePageStore {
+    fn new() -> Result<Self> {
+        Ok(Self {
+            file: tempfile::tempfile()?,
+            end: 0,
+            locs: Vec::new(),
+        })
+    }
+}
+
+impl PageStore for TempFilePageStore {
+    fn put(&mut self, value: Bytes) -> Result<PageKey> {
+        // Always append at the logical end (a prior `take` may have moved the
+        // OS file cursor).
+        self.file.seek(SeekFrom::Start(self.end))?;
+        self.file.write_all(&value)?;
+        let key = PageKey::new(self.locs.len() as u64);
+        self.locs.push((self.end, value.len()));
+        self.end += value.len() as u64;
+        Ok(key)
+    }
+
+    fn take(&mut self, key: PageKey) -> Result<Bytes> {
+        let (offset, len) = self.locs[key.get() as usize];
+        let mut buf = vec![0u8; len];
+        self.file.seek(SeekFrom::Start(offset))?;
+        self.file.read_exact(&mut buf)?;
+        Ok(Bytes::from(buf))
+    }
+}
+
+#[derive(Debug, Default)]
+struct TempFilePageStoreFactory;
+
+impl PageStoreFactory for TempFilePageStoreFactory {
+    fn create(&self, _args: &PageStoreArgs<'_>) -> Result<Box<dyn PageStore>> {
+        Ok(Box::new(TempFilePageStore::new()?))
+    }
+}
+
+/// Rows per batch / batches for the dictionary-column scenario (~4.2M rows).
+const DICT_ROWS_PER_BATCH: usize = 8192;
+const DICT_NUM_BATCHES: usize = 512;
+
+/// Write a single, low-cardinality (16 distinct values), high-row-count column
+/// as one row group. Such a column stays dictionary-encoded, so its completed
+/// data pages would historically pile up in `GenericColumnWriter` until close 
—
+/// the second accumulation point that plain page-buffer spilling does not 
reach.
+fn write_dict_dataset(options: ArrowWriterOptions) {
+    let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, 
false)]));
+    let props = WriterProperties::builder()
+        .set_compression(parquet::basic::Compression::UNCOMPRESSED)
+        .set_max_row_group_row_count(Some(DICT_ROWS_PER_BATCH * 
DICT_NUM_BATCHES * 2))
+        .build();
+    let options = options.with_properties(props);
+    let mut writer =
+        ArrowWriter::try_new_with_options(std::io::sink(), schema.clone(), 
options).unwrap();
+    for b in 0..DICT_NUM_BATCHES {
+        let vals: Vec<i32> = (0..DICT_ROWS_PER_BATCH)
+            .map(|r| ((b + r) % 16) as i32)
+            .collect();
+        let batch =
+            RecordBatch::try_new(schema.clone(), 
vec![Arc::new(Int32Array::from(vals))]).unwrap();
+        writer.write(&batch).unwrap();
+    }
+    writer.close().unwrap();
+}
+
+/// All measurements run in one function so they execute sequentially on a 
single
+/// thread — the tracking allocator is thread-local, so running them as 
separate
+/// parallel tests would each see only their own thread's allocations (which is
+/// fine), but keeping them together also keeps the in-memory/spill comparison 
on
+/// one consistent baseline.
+#[test]
+fn page_store_bounds_write_memory() {
+    let props = single_row_group_props();
+
+    // Baseline: the default in-memory store buffers the whole row group, so 
peak
+    // heap is at least the size of the buffered column data.
+    let in_memory_peak = peak_heap_bytes(|| {
+        let opts = ArrowWriterOptions::new().with_properties(props.clone());
+        write_skewed_dataset(opts);
+    });
+
+    // Spilling: the temp-file store keeps completed pages off the heap, so 
peak
+    // heap stays bounded by the in-flight encoder/dictionary buffers plus a 
page
+    // or two in flight — independent of the row group size.
+    let spill_peak = peak_heap_bytes(|| {
+        let opts = ArrowWriterOptions::new()
+            .with_properties(props.clone())
+            .with_page_store_factory(Arc::new(TempFilePageStoreFactory));
+        write_skewed_dataset(opts);
+    });
+
+    eprintln!(
+        "peak heap — in-memory: {:.1} MiB, temp-file spill: {:.1} MiB (total 
fat payload {:.1} MiB)",
+        in_memory_peak as f64 / (1024.0 * 1024.0),
+        spill_peak as f64 / (1024.0 * 1024.0),
+        TOTAL_FAT_BYTES as f64 / (1024.0 * 1024.0),
+    );
+
+    // The in-memory store must hold most of the ~16 MiB of buffered data.
+    let in_memory_floor = TOTAL_FAT_BYTES * 3 / 4;
+    assert!(
+        in_memory_peak >= in_memory_floor,
+        "expected in-memory peak >= {in_memory_floor} bytes, got 
{in_memory_peak}"
+    );
+
+    // The spilling store must stay near the per-column bound — roughly
+    // (data_page_size + dict_page_size) per leaf column, ~2 MiB × 9 columns —
+    // and far below the in-memory baseline. We assert a generous 8 MiB ceiling
+    // (well under the ~16 MiB row group) to stay robust across platforms.
+    const SPILL_CEILING: usize = 8 * 1024 * 1024;
+    assert!(
+        spill_peak < SPILL_CEILING,
+        "expected spilling peak < {SPILL_CEILING} bytes (bounded by page/dict 
size × columns), \
+         got {spill_peak}"
+    );
+    assert!(
+        spill_peak * 2 < in_memory_peak,
+        "expected spilling peak ({spill_peak}) to be far below the in-memory 
baseline \
+         ({in_memory_peak})"
+    );
+
+    // Dictionary-encoded column: completed data pages reach the page writer 
(and
+    // thus the store) as they are produced, so spilling bounds them too.
+    let dict_in_memory = peak_heap_bytes(|| 
write_dict_dataset(ArrowWriterOptions::new()));
+    let dict_spill = peak_heap_bytes(|| {
+        write_dict_dataset(
+            
ArrowWriterOptions::new().with_page_store_factory(Arc::new(TempFilePageStoreFactory)),
+        )
+    });
+    eprintln!(
+        "dict column ({} rows) peak heap — in-memory: {:.2} MiB, temp-file 
spill: {:.2} MiB",
+        DICT_ROWS_PER_BATCH * DICT_NUM_BATCHES,
+        dict_in_memory as f64 / (1024.0 * 1024.0),
+        dict_spill as f64 / (1024.0 * 1024.0),
+    );
+    assert!(
+        dict_spill * 2 < dict_in_memory,
+        "expected dict-column spilling peak ({dict_spill}) to be far below the 
in-memory \
+         baseline ({dict_in_memory}) — dictionary data pages should spill, not 
accumulate"
+    );
+}

Reply via email to