This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 54fa8932b7 feat(parquet): fuse level encoding with counting and 
histogram updates (#9795)
54fa8932b7 is described below

commit 54fa8932b79d60dcb89264820114d6caf1edc211
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Fri Apr 24 18:43:22 2026 -0400

    feat(parquet): fuse level encoding with counting and histogram updates 
(#9795)
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    - Spawn off from #9653
    - Contributes to #9731
    
    # Rationale for this change
    
    See #9731
    
    # What changes are included in this PR?
    
    Add `put_with_observer()` to `LevelEncoder` that calls an `FnMut(i16,
    usize)` observer for each value during encoding. This allows callers to
    piggyback counting and histogram updates into the encoding pass without
    extra iterations over the level buffer.
    
    Previously, `write_mini_batch()` made 3 separate passes over each level
    array: one to count non-null values or row boundaries, one to update the
    level histogram, and one to RLE-encode. Now all three operations happen
    in a single pass via the observer closure.
    
    Replace `LevelHistogram::update_from_levels()` with a new
    `LevelHistogram::increment_by()` that accepts a count, and remove the
    now-unnecessary `update_definition_level_histogram()` and
    `update_repetition_level_histogram()` methods from PageMetrics.
    
    # Are these changes tested?
    
    All tests passing; existing tests give 100% coverage.
    
    # Are there any user-facing changes?
    
    None
    
    ---------
    
    Signed-off-by: Hippolyte Barraud <[email protected]>
    Co-authored-by: Ed Seidl <[email protected]>
---
 parquet/src/column/reader.rs              |  4 +--
 parquet/src/column/writer/mod.rs          | 56 +++++++++++++------------------
 parquet/src/encodings/levels.rs           | 26 ++++++++------
 parquet/src/file/metadata/mod.rs          | 17 +++++++++-
 parquet/src/util/test_common/page_util.rs |  2 +-
 5 files changed, 58 insertions(+), 47 deletions(-)

diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index d1a328237f..5ab78038f4 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -1402,11 +1402,11 @@ mod tests {
         let make_v2_page =
             |rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows: 
u32| -> Page {
                 let mut rep_enc = LevelEncoder::v2_streaming(max_rep_level);
-                rep_enc.put(rep_levels);
+                rep_enc.put_with_observer(rep_levels, |_, _| {});
                 let rep_bytes = rep_enc.consume();
 
                 let mut def_enc = LevelEncoder::v2_streaming(max_def_level);
-                def_enc.put(def_levels);
+                def_enc.put_with_observer(def_levels, |_, _| {});
                 let def_bytes = def_enc.consume();
 
                 let val_bytes: Vec<u8> = values.iter().flat_map(|v| 
v.to_le_bytes()).collect();
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 46f90d3f77..0c4e40b7ac 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -245,20 +245,6 @@ impl PageMetrics {
             .as_mut()
             .map(LevelHistogram::reset);
     }
-
-    /// Updates histogram values using provided repetition levels
-    fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
-        if let Some(ref mut rep_hist) = self.repetition_level_histogram {
-            rep_hist.update_from_levels(levels);
-        }
-    }
-
-    /// Updates histogram values using provided definition levels
-    fn update_definition_level_histogram(&mut self, levels: &[i16]) {
-        if let Some(ref mut def_hist) = self.definition_level_histogram {
-            def_hist.update_from_levels(levels);
-        }
-    }
 }
 
 // Metrics per column writer
@@ -676,16 +662,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                 )
             })?;
 
-            let values_to_write = levels
-                .iter()
-                .map(|level| (*level == self.descr.max_def_level()) as usize)
-                .sum();
+            let mut values_to_write = 0usize;
+            let max_def = self.descr.max_def_level();
+            let encoder = &mut self.def_levels_encoder;
+            match self.page_metrics.definition_level_histogram.as_mut() {
+                Some(histogram) => encoder.put_with_observer(levels, |level, 
count| {
+                    values_to_write += count * (level == max_def) as usize;
+                    histogram.increment_by(level, count as i64);
+                }),
+                None => encoder.put_with_observer(levels, |level, count| {
+                    values_to_write += count * (level == max_def) as usize;
+                }),
+            };
             self.page_metrics.num_page_nulls += (levels.len() - 
values_to_write) as u64;
-
-            // Update histogram
-            self.page_metrics.update_definition_level_histogram(levels);
-
-            self.def_levels_encoder.put(levels);
             values_to_write
         } else {
             num_levels
@@ -708,15 +697,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                 ));
             }
 
-            // Count the occasions where we start a new row
-            for &level in levels {
-                self.page_metrics.num_buffered_rows += (level == 0) as u32
-            }
-
-            // Update histogram
-            self.page_metrics.update_repetition_level_histogram(levels);
-
-            self.rep_levels_encoder.put(levels);
+            let mut new_rows = 0u32;
+            let encoder = &mut self.rep_levels_encoder;
+            match self.page_metrics.repetition_level_histogram.as_mut() {
+                Some(histogram) => encoder.put_with_observer(levels, |level, 
count| {
+                    new_rows += (count as u32) * (level == 0) as u32;
+                    histogram.increment_by(level, count as i64);
+                }),
+                None => encoder.put_with_observer(levels, |level, count| {
+                    new_rows += (count as u32) * (level == 0) as u32;
+                }),
+            };
+            self.page_metrics.num_buffered_rows += new_rows;
         } else {
             // Each value is exactly one row.
             // Equals to the number of values, we count nulls as well.
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 5d85e240cb..144914a4f5 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -34,7 +34,7 @@ impl LevelEncoder {
     ///
     /// This does not require knowing the number of values
     /// upfront, making it suitable for incremental encoding where levels are 
fed in
-    /// as they arrive via [`put`](Self::put).
+    /// as they arrive via [`put_with_observer`](Self::put_with_observer).
     pub fn v1_streaming(max_level: i16) -> Self {
         let bit_width = num_required_bits(max_level as u64);
         // Reserve space for length header
@@ -46,31 +46,35 @@ impl LevelEncoder {
     ///
     /// This does not require knowing the number of values
     /// upfront, making it suitable for incremental encoding where levels are 
fed in
-    /// as they arrive via [`put`](Self::put).
+    /// as they arrive via [`put_with_observer`](Self::put_with_observer).
     pub fn v2_streaming(max_level: i16) -> Self {
         let bit_width = num_required_bits(max_level as u64);
         LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, Vec::new()))
     }
 
-    /// Put/encode levels vector into this level encoder.
-    /// Returns number of encoded values that are less than or equal to length 
of the
-    /// input buffer.
+    /// Put/encode levels vector into this level encoder and call
+    /// `observer(value, count)` for each value encountered during encoding.
+    ///
+    /// Returns number of encoded values that are less than or equal to length
+    /// of the input buffer.
     ///
     /// This method does **not** flush the underlying encoder, so it can be 
called
     /// incrementally across multiple batches without forcing run boundaries.
     /// The encoder is flushed automatically when [`consume`](Self::consume) 
is called.
     #[inline]
-    pub fn put(&mut self, buffer: &[i16]) -> usize {
-        let mut num_encoded = 0;
+    pub fn put_with_observer<F>(&mut self, buffer: &[i16], mut observer: F) -> 
usize
+    where
+        F: FnMut(i16, usize),
+    {
         match *self {
             LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut 
encoder) => {
-                for value in buffer {
-                    encoder.put(*value as u64);
-                    num_encoded += 1;
+                for &value in buffer {
+                    encoder.put(value as u64);
+                    observer(value, 1);
                 }
+                buffer.len()
             }
         }
-        num_encoded
     }
 
     /// Finalizes level encoder, flush all intermediate buffers and return 
resulting
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 9304b6c25a..156385ddaa 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -919,14 +919,21 @@ impl LevelHistogram {
         }
     }
 
+    /// Increments the count for a level value by `count`.
+    #[inline]
+    pub fn increment_by(&mut self, level: i16, count: i64) {
+        self.inner[level as usize] += count;
+    }
+
     /// Updates histogram values using provided repetition levels
     ///
     /// # Panics
     /// if any of the levels is greater than the length of the histogram (
     /// the argument supplied to [`Self::try_new`])
+    #[deprecated(since = "58.2.0", note = "Use `increment_by` instead")]
     pub fn update_from_levels(&mut self, levels: &[i16]) {
         for &level in levels {
-            self.inner[level as usize] += 1;
+            self.increment_by(level, 1);
         }
     }
 }
@@ -1684,6 +1691,14 @@ mod tests {
         read_column_chunk, read_column_chunk_with_options, read_row_group,
     };
 
+    #[test]
+    #[allow(deprecated)]
+    fn test_level_histogram_update_from_levels_compat() {
+        let mut histogram = LevelHistogram::try_new(2).unwrap();
+        histogram.update_from_levels(&[0, 2, 1, 2, 2]);
+        assert_eq!(histogram.values(), &[1, 1, 3]);
+    }
+
     #[test]
     fn test_row_group_metadata_thrift_conversion() {
         let schema_descr = get_test_schema_descr();
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index 7797427872..0b73498cb2 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -76,7 +76,7 @@ impl DataPageBuilderImpl {
             return 0;
         }
         let mut level_encoder = LevelEncoder::v1_streaming(max_level);
-        level_encoder.put(levels);
+        level_encoder.put_with_observer(levels, |_, _| {});
         let encoded_levels = level_encoder.consume();
         // Actual encoded bytes (without length offset)
         let encoded_bytes = &encoded_levels[mem::size_of::<i32>()..];

Reply via email to