alamb commented on code in PR #9653:
URL: https://github.com/apache/arrow-rs/pull/9653#discussion_r3082019014


##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -466,12 +466,25 @@ impl ColumnValueEncoder for ByteArrayEncoder {
         })
     }
 
-    fn write(&mut self, _values: &Self::Values, _offset: usize, _len: usize) 
-> Result<()> {
-        unreachable!("should call write_gather instead")
+    fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> 
Result<()> {
+        downcast_op!(

Review Comment:
   is this code actually callable now?



##########
parquet/src/encodings/rle.rs:
##########
@@ -41,7 +41,12 @@ use bytes::Bytes;
 use crate::errors::{ParquetError, Result};
 use crate::util::bit_util::{self, BitReader, BitWriter, FromBitpacked};
 
-/// Maximum groups of 8 values per bit-packed run. Current value is 64.
+/// Number of values in one bit-packed group. The Parquet RLE/bit-packing 
hybrid
+/// format always bit-packs values in multiples of this count (see the format 
spec:

Review Comment:
   Can you please provide a link in the comments to this statement



##########
parquet/src/encodings/rle.rs:
##########
@@ -114,27 +119,53 @@ impl RleEncoder {
         let rle_len_prefix = 1;
 
         // The length of an RLE run of 8
-        let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as 
usize, 8);
+        let min_rle_run_size =
+            rle_len_prefix + bit_util::ceil(bit_width as usize, u8::BITS as 
usize);
 
         // The maximum size if stored as shortest possible RLE runs of 8
         let rle_max_size = num_runs * min_rle_run_size;
 
         bit_packed_max_size.max(rle_max_size)
     }
 
+    /// Returns `true` if the encoder is currently in RLE accumulation mode
+    /// for the given value (i.e., `repeat_count >= BIT_PACK_GROUP_SIZE` and 
`current_value == value`).
+    ///
+    /// The encoder enters accumulation mode as soon as the 8th consecutive 
identical
+    /// value has been seen: at that point `flush_buffered_values` has 
committed the
+    /// RLE decision and cleared the staging buffer, so no more per-element 
work is
+    /// needed.  Callers may use [`extend_run`](Self::extend_run) to add 
further
+    /// repetitions in O(1) once this returns `true`.
+    #[inline]
+    pub fn is_accumulating(&self, value: u64) -> bool {

Review Comment:
   Perhaps calling it `is_accumulating_rle` would help readers understand more 
that this is specific for the RLE mode



##########
parquet/src/column/writer/mod.rs:
##########
@@ -328,6 +314,77 @@ impl<T: Default> ColumnMetrics<T> {
     }
 }
 
+#[derive(Debug, Clone, Copy)]
+pub(crate) enum LevelDataRef<'a> {

Review Comment:
   This seems pretty similar to LevelData -- why can't we just use 
`&LevelData`? 
   
   If there is a good reason I think we need an explanation in comments



##########
parquet/src/encodings/levels.rs:
##########
@@ -46,31 +46,76 @@ impl LevelEncoder {
     ///
     /// This does not require knowing the number of values
     /// upfront, making it suitable for incremental encoding where levels are 
fed in
-    /// as they arrive via [`put`](Self::put).
+    /// as they arrive via [`put_with_observer`](Self::put_with_observer).
     pub fn v2_streaming(max_level: i16) -> Self {
         let bit_width = num_required_bits(max_level as u64);
         LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, Vec::new()))
     }
 
-    /// Put/encode levels vector into this level encoder.
-    /// Returns number of encoded values that are less than or equal to length 
of the
-    /// input buffer.
+    /// Put/encode levels vector into this level encoder and calls
+    /// `observer(value, count)` for each run of identical values encountered
+    /// during encoding.
+    /// Returns number of encoded values that are less than or equal to length
+    /// of the input buffer.
     ///
     /// This method does **not** flush the underlying encoder, so it can be 
called
     /// incrementally across multiple batches without forcing run boundaries.
     /// The encoder is flushed automatically when [`consume`](Self::consume) 
is called.
     #[inline]
-    pub fn put(&mut self, buffer: &[i16]) -> usize {

Review Comment:
   In theory this is a breaking API change
   
   However the LevelEncoder is part of the "experimental" API which is 
documented as not being stable



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -336,51 +348,69 @@ impl LevelInfoBuilder {
                 })
             };
 
-        let write_empty_slice = |child: &mut LevelInfoBuilder| {
-            child.visit_leaves(|leaf| {
-                let rep_levels = leaf.rep_levels.as_mut().unwrap();
-                rep_levels.push(ctx.rep_level - 1);
-                let def_levels = leaf.def_levels.as_mut().unwrap();
-                def_levels.push(ctx.def_level - 1);
-            })
+        let flush_nulls = |child: &mut LevelInfoBuilder, count: usize| {
+            if count > 0 {
+                child.visit_leaves(|leaf| {
+                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
+                    leaf.append_def_level_run(ctx.def_level - 2, count);
+                });
+            }
         };
 
-        let write_null_slice = |child: &mut LevelInfoBuilder| {
-            child.visit_leaves(|leaf| {
-                let rep_levels = leaf.rep_levels.as_mut().unwrap();
-                rep_levels.push(ctx.rep_level - 1);
-                let def_levels = leaf.def_levels.as_mut().unwrap();
-                def_levels.push(ctx.def_level - 2);
-            })
+        let flush_empties = |child: &mut LevelInfoBuilder, count: usize| {
+            if count > 0 {
+                child.visit_leaves(|leaf| {
+                    leaf.append_rep_level_run(ctx.rep_level - 1, count);
+                    leaf.append_def_level_run(ctx.def_level - 1, count);
+                });
+            }
         };
 
         match nulls {
             Some(nulls) => {
                 let null_offset = range.start;
+                let mut pending_nulls: usize = 0;

Review Comment:
   I think it might also help future readers to define what empties means in 
this context (and how it is different than null)



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -336,51 +348,69 @@ impl LevelInfoBuilder {
                 })
             };
 
-        let write_empty_slice = |child: &mut LevelInfoBuilder| {
-            child.visit_leaves(|leaf| {
-                let rep_levels = leaf.rep_levels.as_mut().unwrap();
-                rep_levels.push(ctx.rep_level - 1);
-                let def_levels = leaf.def_levels.as_mut().unwrap();
-                def_levels.push(ctx.def_level - 1);
-            })
+        let flush_nulls = |child: &mut LevelInfoBuilder, count: usize| {

Review Comment:
   why rename the closures?  write_null_slice / write_empty_slice seem more 
accurate to me. flush_* suggests draining an internal buffer, but these 
closures are really emitting null/empty level runs.
   



##########
parquet/src/file/metadata/mod.rs:
##########
@@ -929,6 +929,12 @@ impl LevelHistogram {
             self.inner[level as usize] += 1;
         }
     }
+
+    /// Increments the count for a level value by `count`.
+    #[inline]
+    pub fn update_n(&mut self, level: i16, count: i64) {

Review Comment:
   what does the `n` stand for here? As in why not call this `update_count` to 
match the inner?



##########
parquet/src/arrow/arrow_writer/levels.rs:
##########
@@ -625,37 +666,54 @@ impl LevelInfoBuilder {
     fn write_leaf(info: &mut ArrayLevels, range: Range<usize>) {
         let len = range.end - range.start;
 
-        match &mut info.def_levels {
-            Some(def_levels) => {
-                def_levels.reserve(len);
-                info.non_null_indices.reserve(len);
-
-                match &info.logical_nulls {
-                    Some(nulls) => {
-                        assert!(range.end <= nulls.len());
-                        let nulls = nulls.inner();
-                        def_levels.extend(range.clone().map(|i| {
-                            // Safety: range.end was asserted to be in bounds 
earlier
-                            let valid = unsafe { nulls.value_unchecked(i) };
-                            info.max_def_level - (!valid as i16)
-                        }));
-                        info.non_null_indices.extend(
-                            BitIndexIterator::new(nulls.inner(), 
nulls.offset() + range.start, len)
-                                .map(|i| i + range.start),
-                        );
-                    }
-                    None => {
-                        let iter = std::iter::repeat_n(info.max_def_level, 
len);
-                        def_levels.extend(iter);
-                        info.non_null_indices.extend(range);
-                    }
-                }
+        // Fast path: entire leaf array is null
+        if let Some(nulls) = &info.logical_nulls {
+            if !matches!(info.def_levels, LevelData::Absent) && 
nulls.null_count() == nulls.len() {
+                info.extend_uniform_levels(info.max_def_level - 1, 
info.max_rep_level, len);
+                return;
+            }
+        }
+
+        match info.logical_nulls.clone() {

Review Comment:
   Why clone?



##########
parquet/src/encodings/rle.rs:
##########
@@ -114,27 +119,53 @@ impl RleEncoder {
         let rle_len_prefix = 1;
 
         // The length of an RLE run of 8
-        let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as 
usize, 8);
+        let min_rle_run_size =
+            rle_len_prefix + bit_util::ceil(bit_width as usize, u8::BITS as 
usize);
 
         // The maximum size if stored as shortest possible RLE runs of 8
         let rle_max_size = num_runs * min_rle_run_size;
 
         bit_packed_max_size.max(rle_max_size)
     }
 
+    /// Returns `true` if the encoder is currently in RLE accumulation mode
+    /// for the given value (i.e., `repeat_count >= BIT_PACK_GROUP_SIZE` and 
`current_value == value`).
+    ///
+    /// The encoder enters accumulation mode as soon as the 8th consecutive 
identical
+    /// value has been seen: at that point `flush_buffered_values` has 
committed the
+    /// RLE decision and cleared the staging buffer, so no more per-element 
work is
+    /// needed.  Callers may use [`extend_run`](Self::extend_run) to add 
further
+    /// repetitions in O(1) once this returns `true`.
+    #[inline]
+    pub fn is_accumulating(&self, value: u64) -> bool {
+        self.repeat_count >= BIT_PACK_GROUP_SIZE && self.current_value == value
+    }
+
+    /// Extends the current RLE run by `count` additional repetitions.
+    ///
+    /// # Preconditions
+    /// The caller **must** have verified 
[`is_accumulating`](Self::is_accumulating)
+    /// returns `true` for the same value before calling this method.
+    #[inline]
+    pub fn extend_run(&mut self, count: usize) {
+        debug_assert!(self.repeat_count >= BIT_PACK_GROUP_SIZE);
+        self.repeat_count += count;
+    }
+
     /// Encodes `value`, which must be representable with `bit_width` bits.
     #[inline]
     pub fn put(&mut self, value: u64) {
-        // This function buffers 8 values at a time. After seeing 8 values, it
-        // decides whether the current run should be encoded in bit-packed or 
RLE.
+        // This function buffers BIT_PACK_GROUP_SIZE values at a time. After 
seeing that
+        // many values, it decides whether the current run should be encoded 
in bit-packed
+        // or RLE.
         if self.current_value == value {
             self.repeat_count += 1;
-            if self.repeat_count > 8 {
+            if self.repeat_count > BIT_PACK_GROUP_SIZE {

Review Comment:
   the change to use a constant is better than hard coded constants --thank you



##########
parquet/src/column/writer/mod.rs:
##########
@@ -328,6 +314,77 @@ impl<T: Default> ColumnMetrics<T> {
     }
 }
 
+#[derive(Debug, Clone, Copy)]
+pub(crate) enum LevelDataRef<'a> {
+    Absent,
+    Materialized(&'a [i16]),
+    Uniform { value: i16, count: usize },
+}
+
+impl<'a> LevelDataRef<'a> {
+    pub(crate) fn len(self) -> usize {
+        match self {
+            Self::Absent => 0,
+            Self::Materialized(values) => values.len(),
+            Self::Uniform { count, .. } => count,
+        }
+    }
+
+    pub(crate) fn first(self) -> Option<i16> {
+        match self {
+            Self::Absent => None,
+            Self::Materialized(values) => values.first().copied(),
+            Self::Uniform { value, count } => (count > 0).then_some(value),
+        }
+    }
+
+    #[cfg(feature = "arrow")]
+    pub(crate) fn value_at(self, idx: usize) -> Option<i16> {
+        match self {
+            Self::Absent => None,
+            Self::Materialized(values) => values.get(idx).copied(),
+            Self::Uniform { value, count } => (idx < count).then_some(value),
+        }
+    }
+
+    pub(crate) fn slice(self, offset: usize, len: usize) -> Self {
+        match self {
+            Self::Absent => Self::Absent,
+            Self::Materialized(values) => 
Self::Materialized(&values[offset..offset + len]),
+            Self::Uniform { value, .. } => Self::Uniform { value, count: len },
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub(crate) enum ValueSelectionRef<'a> {

Review Comment:
   likewise here -- an explanation of this and how it is related to 
ValueSelection would be really helpful



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to