This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a05129a08e feat(parquet): stream-encode definition/repetition levels 
incrementally (#9447)
a05129a08e is described below

commit a05129a08ecf2f1f39f7eba56eb1d76848f79c3a
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Wed Apr 1 15:21:19 2026 -0400

    feat(parquet): stream-encode definition/repetition levels incrementally 
(#9447)
    
    # Which issue does this PR close?
    
    - Closes #9446.
    - closes https://github.com/apache/arrow-rs/pull/9636
    
    # Rationale for this change
    
    When writing a Parquet column with very sparse data,
    `GenericColumnWriter` accumulates unbounded memory for definition and
    repetition levels. The raw `i16` values are appended into `Vec<i16>`
    sinks on every `write_batch` call and only RLE-encoded in bulk when a
    data page is flushed. For a column that is almost entirely nulls, the
    actual RLE-encoded output can be tiny, yet the intermediate buffer grows
    linearly with the number of rows.
    
    # What changes are included in this PR?
    
    Replace the two raw-level `Vec<i16>` sinks (`def_levels_sink` /
    `rep_levels_sink`) with streaming `LevelEncoder` fields
    (`def_levels_encoder` / `rep_levels_encoder`). Behavior is the same, but
    we keep running RLE-encoded state rather than the full list of rows in
    memory. Existing logic is reused.
    
    # Are these changes tested?
    
    Yes, all tests passing.
    Benchmarks show no regression. `list_primitive` benches improved by
    3-5%:
    
    ```
    Benchmarking list_primitive/default: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 6.1s, enable flat sampling, or reduce sample count to 60.
    list_primitive/default  time:   [1.2109 ms 1.2171 ms 1.2248 ms]
                            thrpt:  [1.6999 GiB/s 1.7105 GiB/s 1.7194 GiB/s]
                     change:
                            time:   [−3.7197% −2.8848% −2.0036%] (p = 0.00 < 
0.05)
                            thrpt:  [+2.0445% +2.9705% +3.8634%]
                            Performance has improved.
    Found 4 outliers among 100 measurements (4.00%)
      3 (3.00%) high mild
      1 (1.00%) high severe
    Benchmarking list_primitive/bloom_filter: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 7.5s, enable flat sampling, or reduce sample count to 50.
    list_primitive/bloom_filter
                            time:   [1.4405 ms 1.4810 ms 1.5292 ms]
                            thrpt:  [1.3615 GiB/s 1.4058 GiB/s 1.4452 GiB/s]
                     change:
                            time:   [−6.4332% −4.7568% −2.9048%] (p = 0.00 < 
0.05)
                            thrpt:  [+2.9917% +4.9944% +6.8755%]
                            Performance has improved.
    Found 5 outliers among 100 measurements (5.00%)
      2 (2.00%) high mild
      3 (3.00%) high severe
    Benchmarking list_primitive/parquet_2: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 6.3s, enable flat sampling, or reduce sample count to 60.
    list_primitive/parquet_2
                            time:   [1.2271 ms 1.2311 ms 1.2362 ms]
                            thrpt:  [1.6841 GiB/s 1.6911 GiB/s 1.6966 GiB/s]
                     change:
                            time:   [−5.8536% −4.9672% −4.1905%] (p = 0.00 < 
0.05)
                            thrpt:  [+4.3738% +5.2269% +6.2175%]
                            Performance has improved.
    Found 5 outliers among 100 measurements (5.00%)
      2 (2.00%) high mild
      3 (3.00%) high severe
    list_primitive/zstd     time:   [2.0056 ms 2.0148 ms 2.0262 ms]
                            thrpt:  [1.0275 GiB/s 1.0333 GiB/s 1.0381 GiB/s]
                     change:
                            time:   [−4.7073% −3.6719% −2.6698%] (p = 0.00 < 
0.05)
                            thrpt:  [+2.7431% +3.8118% +4.9398%]
                            Performance has improved.
    Found 12 outliers among 100 measurements (12.00%)
      2 (2.00%) high mild
      10 (10.00%) high severe
    list_primitive/zstd_parquet_2
                            time:   [2.0455 ms 2.0730 ms 2.1120 ms]
                            thrpt:  [1009.4 MiB/s 1.0043 GiB/s 1.0178 GiB/s]
                     change:
                            time:   [−5.8626% −3.7672% −1.4196%] (p = 0.00 < 
0.05)
                            thrpt:  [+1.4401% +3.9146% +6.2277%]
                            Performance has improved.
    Found 7 outliers among 100 measurements (7.00%)
      2 (2.00%) high mild
      5 (5.00%) high severe
    
    Benchmarking list_primitive_non_null/default: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 6.6s, enable flat sampling, or reduce sample count to 60.
    list_primitive_non_null/default
                            time:   [1.3199 ms 1.3333 ms 1.3504 ms]
                            thrpt:  [1.5384 GiB/s 1.5581 GiB/s 1.5740 GiB/s]
                     change:
                            time:   [−4.1662% −2.3491% −0.7148%] (p = 0.01 < 
0.05)
                            thrpt:  [+0.7200% +2.4056% +4.3473%]
                            Change within noise threshold.
    Found 6 outliers among 100 measurements (6.00%)
      3 (3.00%) high mild
      3 (3.00%) high severe
    Benchmarking list_primitive_non_null/bloom_filter: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 8.4s, enable flat sampling, or reduce sample count to 50.
    list_primitive_non_null/bloom_filter
                            time:   [1.6567 ms 1.6668 ms 1.6805 ms]
                            thrpt:  [1.2362 GiB/s 1.2464 GiB/s 1.2540 GiB/s]
                     change:
                            time:   [−2.7884% −1.3493% +0.2820%] (p = 0.07 > 
0.05)
                            thrpt:  [−0.2812% +1.3677% +2.8684%]
                            No change in performance detected.
    Found 4 outliers among 100 measurements (4.00%)
      1 (1.00%) high mild
      3 (3.00%) high severe
    Benchmarking list_primitive_non_null/parquet_2: Warming up for 3.0000 s
    Warning: Unable to complete 100 samples in 5.0s. You may wish to increase 
target time to 7.2s, enable flat sampling, or reduce sample count to 50.
    list_primitive_non_null/parquet_2
                            time:   [1.4279 ms 1.4409 ms 1.4551 ms]
                            thrpt:  [1.4277 GiB/s 1.4418 GiB/s 1.4550 GiB/s]
                     change:
                            time:   [−2.0598% −0.9952% −0.1318%] (p = 0.04 < 
0.05)
                            thrpt:  [+0.1319% +1.0052% +2.1032%]
                            Change within noise threshold.
    Found 3 outliers among 100 measurements (3.00%)
      2 (2.00%) high mild
      1 (1.00%) high severe
    list_primitive_non_null/zstd
                            time:   [2.6966 ms 2.7358 ms 2.7994 ms]
                            thrpt:  [759.93 MiB/s 777.60 MiB/s 788.89 MiB/s]
                     change:
                            time:   [−3.8379% −2.1418% +0.0785%] (p = 0.03 < 
0.05)
                            thrpt:  [−0.0784% +2.1887% +3.9911%]
                            Change within noise threshold.
    Found 7 outliers among 100 measurements (7.00%)
      3 (3.00%) high mild
      4 (4.00%) high severe
    list_primitive_non_null/zstd_parquet_2
                            time:   [2.7684 ms 2.7861 ms 2.8099 ms]
                            thrpt:  [757.07 MiB/s 763.55 MiB/s 768.44 MiB/s]
                     change:
                            time:   [−6.4460% −4.1387% −2.1474%] (p = 0.00 < 
0.05)
                            thrpt:  [+2.1946% +4.3174% +6.8901%]
                            Performance has improved.
    ```
    
    # Are there any user-facing changes?
    
    None. Some internal symbols are now unused. I added some
    `#[allow(dead_code)]` statements since these were experimental-visible
    and might be externally relied on.
    
    ---------
    
    Signed-off-by: Hippolyte Barraud <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/column/reader.rs              |  4 +-
 parquet/src/column/writer/mod.rs          | 73 +++++++++-----------------
 parquet/src/encodings/levels.rs           | 87 ++++++++++++++++++-------------
 parquet/src/encodings/rle.rs              | 17 +++++-
 parquet/src/util/bit_util.rs              |  7 +++
 parquet/src/util/test_common/page_util.rs |  2 +-
 6 files changed, 100 insertions(+), 90 deletions(-)

diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 29cb50185a..d1a328237f 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -1401,11 +1401,11 @@ mod tests {
         // Helper: build a DataPage v2 for this list column.
         let make_v2_page =
             |rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: 
u32| -> Page {
-                let mut rep_enc = LevelEncoder::v2(max_rep_level, 
rep_levels.len());
+                let mut rep_enc = LevelEncoder::v2_streaming(max_rep_level);
                 rep_enc.put(rep_levels);
                 let rep_bytes = rep_enc.consume();
 
-                let mut def_enc = LevelEncoder::v2(max_def_level, 
def_levels.len());
+                let mut def_enc = LevelEncoder::v2_streaming(max_def_level);
                 def_enc.put(def_levels);
                 let def_bytes = def_enc.consume();
 
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 4c3dbabc21..cdf489f3b6 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -351,9 +351,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
     /// but we use a BTreeSet so that the output is deterministic
     encodings: BTreeSet<Encoding>,
     encoding_stats: Vec<PageEncodingStats>,
-    // Reused buffers
-    def_levels_sink: Vec<i16>,
-    rep_levels_sink: Vec<i16>,
+    // Streaming level encoders for definition/repetition levels.
+    def_levels_encoder: LevelEncoder,
+    rep_levels_encoder: LevelEncoder,
     data_pages: VecDeque<CompressedPage>,
     // column index and offset index
     column_index_builder: ColumnIndexBuilder,
@@ -411,6 +411,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
         };
 
         Self {
+            def_levels_encoder: 
Self::create_level_encoder(descr.max_def_level(), &props),
+            rep_levels_encoder: 
Self::create_level_encoder(descr.max_rep_level(), &props),
             descr,
             props,
             statistics_enabled,
@@ -418,8 +420,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
             codec,
             compressor,
             encoder,
-            def_levels_sink: vec![],
-            rep_levels_sink: vec![],
             data_pages: VecDeque::new(),
             page_metrics,
             column_metrics,
@@ -647,6 +647,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         })
     }
 
+    /// Creates a new streaming level encoder appropriate for the writer 
version.
+    fn create_level_encoder(max_level: i16, props: &WriterProperties) -> 
LevelEncoder {
+        match props.writer_version() {
+            WriterVersion::PARQUET_1_0 => 
LevelEncoder::v1_streaming(Encoding::RLE, max_level),
+            WriterVersion::PARQUET_2_0 => 
LevelEncoder::v2_streaming(max_level),
+        }
+    }
+
     /// Writes mini batch of values, definition and repetition levels.
     /// This allows fine-grained processing of values and maintaining a 
reasonable
     /// page size.
@@ -677,7 +685,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
             // Update histogram
             self.page_metrics.update_definition_level_histogram(levels);
 
-            self.def_levels_sink.extend_from_slice(levels);
+            self.def_levels_encoder.put(levels);
             values_to_write
         } else {
             num_levels
@@ -708,7 +716,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
             // Update histogram
             self.page_metrics.update_repetition_level_histogram(levels);
 
-            self.rep_levels_sink.extend_from_slice(levels);
+            self.rep_levels_encoder.put(levels);
         } else {
             // Each value is exactly one row.
             // Equals to the number of values, we count nulls as well.
@@ -1060,23 +1068,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                 let mut buffer = vec![];
 
                 if max_rep_level > 0 {
-                    buffer.extend_from_slice(
-                        &self.encode_levels_v1(
-                            Encoding::RLE,
-                            &self.rep_levels_sink[..],
-                            max_rep_level,
-                        )[..],
-                    );
+                    self.rep_levels_encoder
+                        .flush_to(|data| buffer.extend_from_slice(data));
                 }
 
                 if max_def_level > 0 {
-                    buffer.extend_from_slice(
-                        &self.encode_levels_v1(
-                            Encoding::RLE,
-                            &self.def_levels_sink[..],
-                            max_def_level,
-                        )[..],
-                    );
+                    self.def_levels_encoder
+                        .flush_to(|data| buffer.extend_from_slice(data));
                 }
 
                 buffer.extend_from_slice(&values_data.buf);
@@ -1106,15 +1104,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                 let mut buffer = vec![];
 
                 if max_rep_level > 0 {
-                    let levels = 
self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
-                    rep_levels_byte_len = levels.len();
-                    buffer.extend_from_slice(&levels[..]);
+                    self.rep_levels_encoder
+                        .flush_to(|data| buffer.extend_from_slice(data));
+                    rep_levels_byte_len = buffer.len();
                 }
 
                 if max_def_level > 0 {
-                    let levels = 
self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
-                    def_levels_byte_len = levels.len();
-                    buffer.extend_from_slice(&levels[..]);
+                    self.def_levels_encoder
+                        .flush_to(|data| buffer.extend_from_slice(data));
+                    def_levels_byte_len = buffer.len() - rep_levels_byte_len;
                 }
 
                 let uncompressed_size =
@@ -1164,10 +1162,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
 
         // Update total number of rows.
         self.column_metrics.total_rows_written += 
self.page_metrics.num_buffered_rows as u64;
-
-        // Reset state.
-        self.rep_levels_sink.clear();
-        self.def_levels_sink.clear();
         self.page_metrics.new_page();
 
         Ok(())
@@ -1244,23 +1238,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
         Ok(metadata)
     }
 
-    /// Encodes definition or repetition levels for Data Page v1.
-    #[inline]
-    fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: 
i16) -> Vec<u8> {
-        let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
-        encoder.put(levels);
-        encoder.consume()
-    }
-
-    /// Encodes definition or repetition levels for Data Page v2.
-    /// Encoding is always RLE.
-    #[inline]
-    fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
-        let mut encoder = LevelEncoder::v2(max_level, levels.len());
-        encoder.put(levels);
-        encoder.consume()
-    }
-
     /// Writes compressed data page into underlying sink and updates global 
metrics.
     #[inline]
     fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 8c95e5ca51..b761a5ac5d 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -21,21 +21,7 @@ use super::rle::RleEncoder;
 
 use crate::basic::Encoding;
 use crate::data_type::AsBytes;
-use crate::util::bit_util::{BitWriter, ceil, num_required_bits};
-
-/// Computes max buffer size for level encoder/decoder based on encoding, max
-/// repetition/definition level and number of total buffered values (includes 
null
-/// values).
-#[inline]
-pub fn max_buffer_size(encoding: Encoding, max_level: i16, 
num_buffered_values: usize) -> usize {
-    let bit_width = num_required_bits(max_level as u64);
-    match encoding {
-        Encoding::RLE => RleEncoder::max_buffer_size(bit_width, 
num_buffered_values),
-        #[allow(deprecated)]
-        Encoding::BIT_PACKED => ceil(num_buffered_values * bit_width as usize, 
8),
-        _ => panic!("Unsupported encoding type {encoding}"),
-    }
-}
+use crate::util::bit_util::{BitWriter, num_required_bits};
 
 /// Encoder for definition/repetition levels.
 /// Currently only supports Rle and BitPacked (dev/null) encoding, including 
v2.
@@ -46,46 +32,44 @@ pub enum LevelEncoder {
 }
 
 impl LevelEncoder {
-    /// Creates new level encoder based on encoding, max level and underlying 
byte buffer.
-    /// For bit packed encoding it is assumed that buffer is already allocated 
with
-    /// `levels::max_buffer_size` method.
-    ///
-    /// Used to encode levels for Data Page v1.
+    /// Creates a new streaming level encoder for Data Page v1.
     ///
-    /// Panics, if encoding is not supported.
-    pub fn v1(encoding: Encoding, max_level: i16, capacity: usize) -> Self {
-        let capacity_bytes = max_buffer_size(encoding, max_level, capacity);
-        let mut buffer = Vec::with_capacity(capacity_bytes);
+    /// Unlike [`v1`](Self::v1), this does not require knowing the number of 
values
+    /// upfront, making it suitable for incremental encoding where levels are 
fed in
+    /// as they arrive via [`put`](Self::put).
+    pub fn v1_streaming(encoding: Encoding, max_level: i16) -> Self {
         let bit_width = num_required_bits(max_level as u64);
         match encoding {
             Encoding::RLE => {
                 // Reserve space for length header
-                buffer.extend_from_slice(&[0; 4]);
+                let buffer = vec![0u8; 4];
                 LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
             }
             #[allow(deprecated)]
             Encoding::BIT_PACKED => {
-                // Here we set full byte buffer without adjusting for 
num_buffered_values,
-                // because byte buffer will already be allocated with size from
-                // `max_buffer_size()` method.
-                LevelEncoder::BitPacked(bit_width, 
BitWriter::new_from_buf(buffer))
+                LevelEncoder::BitPacked(bit_width, 
BitWriter::new_from_buf(Vec::new()))
             }
             _ => panic!("Unsupported encoding type {encoding}"),
         }
     }
 
-    /// Creates new level encoder based on RLE encoding. Used to encode Data 
Page v2
-    /// repetition and definition levels.
-    pub fn v2(max_level: i16, capacity: usize) -> Self {
-        let capacity_bytes = max_buffer_size(Encoding::RLE, max_level, 
capacity);
-        let buffer = Vec::with_capacity(capacity_bytes);
+    /// Creates a new streaming RLE level encoder for Data Page v2.
+    ///
+    /// Unlike [`v2`](Self::v2), this does not require knowing the number of 
values
+    /// upfront, making it suitable for incremental encoding where levels are 
fed in
+    /// as they arrive via [`put`](Self::put).
+    pub fn v2_streaming(max_level: i16) -> Self {
         let bit_width = num_required_bits(max_level as u64);
-        LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, buffer))
+        LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, Vec::new()))
     }
 
     /// Put/encode levels vector into this level encoder.
     /// Returns number of encoded values that are less than or equal to length 
of the
     /// input buffer.
+    ///
+    /// This method does **not** flush the underlying encoder, so it can be 
called
+    /// incrementally across multiple batches without forcing run boundaries.
+    /// The encoder is flushed automatically when [`consume`](Self::consume) 
is called.
     #[inline]
     pub fn put(&mut self, buffer: &[i16]) -> usize {
         let mut num_encoded = 0;
@@ -95,14 +79,12 @@ impl LevelEncoder {
                     encoder.put(*value as u64);
                     num_encoded += 1;
                 }
-                encoder.flush();
             }
             LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
                 for value in buffer {
                     encoder.put_value(*value as u64, bit_width as usize);
                     num_encoded += 1;
                 }
-                encoder.flush();
             }
         }
         num_encoded
@@ -111,6 +93,7 @@ impl LevelEncoder {
     /// Finalizes level encoder, flush all intermediate buffers and return 
resulting
     /// encoded buffer. Returned buffer is already truncated to encoded bytes 
only.
     #[inline]
+    #[allow(unused)]
     pub fn consume(self) -> Vec<u8> {
         match self {
             LevelEncoder::Rle(encoder) => {
@@ -126,4 +109,34 @@ impl LevelEncoder {
             LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
         }
     }
+
+    /// Flushes all intermediate buffers, passes the encoded data to `f`, then
+    /// resets the encoder for reuse while retaining the buffer allocation.
+    #[inline]
+    pub fn flush_to<F, R>(&mut self, f: F) -> R
+    where
+        F: FnOnce(&[u8]) -> R,
+    {
+        let result = match self {
+            LevelEncoder::Rle(encoder) => {
+                let data = encoder.flush_buffer_mut();
+                // Patch the 4-byte length header reserved at the start of the 
buffer
+                let encoded_len = (data.len() - mem::size_of::<i32>()) as i32;
+                data[..4].copy_from_slice(&encoded_len.to_le_bytes());
+                f(data)
+            }
+            LevelEncoder::RleV2(encoder) => f(encoder.flush_buffer()),
+            LevelEncoder::BitPacked(_, encoder) => f(encoder.flush_buffer()),
+        };
+        match self {
+            LevelEncoder::Rle(encoder) => {
+                encoder.clear();
+                // Re-reserve the 4-byte length header for the next page
+                encoder.skip(mem::size_of::<i32>());
+            }
+            LevelEncoder::RleV2(encoder) => encoder.clear(),
+            LevelEncoder::BitPacked(_, encoder) => encoder.clear(),
+        }
+        result
+    }
 }
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index c95a46c634..2815c20dab 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -177,16 +177,22 @@ impl RleEncoder {
     /// Borrow equivalent of the `consume` method.
     /// Call `clear()` after invoking this method.
     #[inline]
-    #[allow(unused)]
     pub fn flush_buffer(&mut self) -> &[u8] {
         self.flush();
         self.bit_writer.flush_buffer()
     }
 
+    /// Like `flush_buffer`, but returns mutable access to the internal buffer.
+    /// Call `clear()` after invoking this method.
+    #[inline]
+    pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
+        self.flush();
+        self.bit_writer.flush_buffer_mut()
+    }
+
     /// Clears the internal state so this encoder can be reused (e.g., after 
becoming
     /// full).
     #[inline]
-    #[allow(unused)]
     pub fn clear(&mut self) {
         self.bit_writer.clear();
         self.num_buffered_values = 0;
@@ -196,6 +202,13 @@ impl RleEncoder {
         self.indicator_byte_pos = -1;
     }
 
+    /// Advances the buffer by `num_bytes` zero bytes, delegating to the
+    /// underlying [`BitWriter::skip`].
+    #[inline]
+    pub fn skip(&mut self, num_bytes: usize) {
+        self.bit_writer.skip(num_bytes);
+    }
+
     /// Flushes all remaining values and return the final byte buffer 
maintained by the
     /// internal writer.
     #[inline]
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index 2625648258..7d7907f6f5 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -219,6 +219,13 @@ impl BitWriter {
         self.buffer()
     }
 
+    /// Like `flush_buffer`, but returns mutable access to the buffer.
+    #[inline]
+    pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
+        self.flush();
+        &mut self.buffer
+    }
+
     /// Clears the internal state so the buffer can be reused.
     #[inline]
     pub fn clear(&mut self) {
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index 5b64eb5413..6a99beaea1 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -75,7 +75,7 @@ impl DataPageBuilderImpl {
         if max_level <= 0 {
             return 0;
         }
-        let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, 
levels.len());
+        let mut level_encoder = LevelEncoder::v1_streaming(Encoding::RLE, 
max_level);
         level_encoder.put(levels);
         let encoded_levels = level_encoder.consume();
         // Actual encoded bytes (without length offset)

Reply via email to