zhang2014 opened a new issue, #9957:
URL: https://github.com/apache/arrow-rs/issues/9957

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   
   I want to write **a large record batch into a parquet file consisting of a 
single (arbitrarily large) row group**. A typical use case is log data — a 
single parquet file can reach several GB to tens of GB.
   
   The ideal memory profile is:
   
   - columns processed strictly sequentially, only one leaf column being 
encoded at any time
   - for each column, pages are flushed to the file sink as soon as they are 
full
   - peak memory is on the order of one encoder's in-progress page, 
**independent of total row count and column count**
   
   This is the natural shape of the problem: a single row group means no 
cross-row-group buffering; sequential columns means no cross-column buffering; 
page-level streaming means no cross-page buffering. For files in the several-GB 
to tens-of-GB range, the absence of this streaming capability means writer peak 
memory scales with the total row group size instead of converging on a single 
page.
   
   Neither of the existing public paths supports this:
   
   **`ArrowWriter`** — `ArrowWriter::write(batch)` pushes data row-wise into 
every column's encoder. `ArrowColumnWriter` internally uses `ArrowPageWriter`, 
which buffers every compressed page into a `SharedColumnChunk` until `close()` 
is called, then appends the whole column chunk to the file sink via 
`ArrowColumnChunk::append_to_row_group`. Peak memory = Σ(compressed bytes of 
every leaf column's chunk) + encoder state for every column. For a 10 GB target 
file with a wide schema, even with 5× compression the combined compressed 
column chunks can reach the 2 GB range. There is no knob to make 
`ArrowPageWriter` flush pages as they are produced.
   
   **`SerializedFileWriter` + `next_column()` + 
`SerializedColumnWriter::typed::<T>()`** — this is the natural low-level path. 
`SerializedPageWriter` (the default page writer under `next_column()`) already 
flushes each compressed page directly to the file sink, which is exactly the 
behavior I want. But two problems block this path:
   
   ***Problem 1: `SerializedColumnWriter` hardcodes the generic encoder.***
   
   ```rust
   pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, 
ColumnValueEncoderImpl<T>>;
   
   pub enum ColumnWriter<'a> {
       ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>), ...
   }
   ```
   
   Going through `next_column()` locks in `ColumnValueEncoderImpl<T>`. To use 
`ByteArrayEncoder` (the specialized zero-copy byte array encoder that 
`ArrowColumnWriter` uses internally), I need to construct 
`GenericColumnWriter<ByteArrayEncoder>` myself via a factory — but 
`SerializedRowGroupWriter::next_column_with_factory` is `pub(crate)`:
   
   ```rust
   pub(crate) fn next_column_with_factory<'b, F, C>(
       &'b mut self,
       factory: F,
   ) -> Result<Option<C>>
   where
       F: FnOnce(
           ColumnDescPtr,
           WriterPropertiesPtr,
           Box<dyn PageWriter + 'b>,
           OnCloseColumnChunk<'b>,
       ) -> Result<C>,
   ```
   
   The signature is already fully generic in the return type `C`; the only 
barrier is `pub(crate)`.
   
   ***Problem 2: `write_batch` requires a dense value array, discarding 
null-handling information I already have.***
   
   Arrow arrays naturally produce `(dense values, non-null indices)` pairs when 
computing definition/repetition levels (see `ArrayLevels::non_null_indices()`). 
The public API is:
   
   ```rust
   pub fn write_batch(
       &mut self,
       values: &E::Values,
       def_levels: Option<&[i16]>,
       rep_levels: Option<&[i16]>,
   ) -> Result<usize>
   ```
   
   There is no way for external callers to say "take values at these indices". 
The internal `write_batch_internal` has exactly this entry point:
   
   ```rust
   pub(crate) fn write_batch_internal(
       &mut self,
       values: &E::Values,
       value_indices: Option<&[usize]>,  // <-- this
       def_levels: LevelDataRef<'_>,
       rep_levels: LevelDataRef<'_>,
       min: Option<&E::T>,
       max: Option<&E::T>,
       distinct_count: Option<u64>,
   ) -> Result<usize>
   ```
   
   This is exactly the path `ArrowColumnWriter` uses internally via 
`write_primitive` to avoid materializing a gathered copy. External callers can 
only gather into a scratch `Vec<E::T>` and call `write_batch`, or go through 
`ArrowColumnWriter` and accept the column-chunk buffering.
   
   **Describe the solution you'd like**
   
   Two visibility changes + one thin wrapper method. No implementation changes, 
no behavior changes.
   
   ***1. `SerializedRowGroupWriter::next_column_with_factory` → `pub`***
   
   ```diff
    impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
   -    pub(crate) fn next_column_with_factory<'b, F, C>(&'b mut self, factory: 
F) -> Result<Option<C>>
   +    pub fn next_column_with_factory<'b, F, C>(&'b mut self, factory: F) -> 
Result<Option<C>>
    }
   ```
   
   This lets callers plug in their own `GenericColumnWriter<E>` (including 
`GenericColumnWriter<ByteArrayEncoder>`) with the file's `SerializedPageWriter` 
already correctly wired up by arrow-rs internally.
   
   ***2. New `pub fn write_batch_with_indices` on `GenericColumnWriter`***
   
   ```rust
   impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
       pub fn write_batch_with_indices(
           &mut self,
           values: &E::Values,
           indices: &[usize],
           def_levels: Option<&[i16]>,
           rep_levels: Option<&[i16]>,
       ) -> Result<usize> {
           self.write_batch_internal(
               values, Some(indices),
               LevelDataRef::from(def_levels),
               LevelDataRef::from(rep_levels),
               None, None, None,
           )
       }
   
       pub fn write_batch_with_indices_and_statistics(
           &mut self,
           values: &E::Values,
           indices: &[usize],
           def_levels: Option<&[i16]>,
           rep_levels: Option<&[i16]>,
           min: Option<&E::T>,
           max: Option<&E::T>,
           distinct_count: Option<u64>,
       ) -> Result<usize> { ... }
   }
   ```
   
   Exposes the existing `value_indices` code path. `write_batch` already 
delegates to `write_batch_internal` with `value_indices: None`; the new API 
just exposes the `Some(indices)` variant as a public wrapper.
   
   **Describe alternatives you've considered**
   
   *Use `ArrowColumnWriter` as-is.* This is the right default for most callers, 
but not when the file is designed to contain a single large row group (as in 
log workloads reaching several GB to tens of GB per file), columns are 
processed strictly sequentially, and peak memory is dominated by 
per-column-chunk buffering rather than encoding cost. In this shape, the 
natural memory ceiling is one encoder's in-progress page, not one column chunk.
   
   *Pre-gather into a dense `Vec<T>` before `write_batch`.* Works for 
fixed-width types, but wastes `K × sizeof(T)` of memory that the 
`value_indices` path would save. For `ByteArrayType` it wastes `Vec<ByteArray>` 
(~32 bytes per row).
   
   *Reimplement parquet encoding downstream.* The Arrow → Parquet dispatch 
(`write_leaf`), level computation, and byte array encoders are non-trivial and 
evolve with the format spec. Not a viable long-term maintenance path.
   
   **Additional context**
   
   The `SerializedFileWriter` + `SerializedPageWriter` infrastructure already 
supports page-streamed column writes end-to-end — what's missing is just the 
plumbing to connect it with Arrow input and the existing specialized encoders. 
Happy to submit a PR if this direction is acceptable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to