This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 2d8cd5a987 parquet: O(1) skip for bw=0 miniblocks in
DeltaBitPackDecoder (#9786)
2d8cd5a987 is described below
commit 2d8cd5a9876b9d761afd0ed78d33a951bc1aadb5
Author: Thaddeus Covert <[email protected]>
AuthorDate: Wed Apr 22 12:04:46 2026 -0400
parquet: O(1) skip for bw=0 miniblocks in DeltaBitPackDecoder (#9786)
- Closes #9783
In the non-terminal skip path of `DeltaBitPackDecoder::skip()`,
miniblocks with
`bit_width=0` no longer call `get_batch`. Every delta in a bw=0
miniblock equals
`min_delta` exactly, so `last_value` can be advanced by `n * min_delta`
using a
single `wrapping_mul` + `wrapping_add`. When `min_delta == 0` no update
is
needed at all.
**Benchmarks (`arrow_reader` bench vs upstream HEAD, `-- skip --baseline
upstream`):**
```
Int32 skip single value: -23.7%
Int32 skip increasing value: -16.5%
Int32 skip mandatory, no NULLs: -4.1%
Int64 skip single value: -25.9%
Int64 skip increasing value: -21.0%
```
Results are consistent across all integer types (Int8/16/32/64,
UInt8/16/32/64,
Decimal128). Benchmarks were run on a non-isolated machine (no CPU
frequency
pinning); small variances of ±5% on non-bw=0 paths should be attributed
to
measurement noise.
**Origin:** Observed that constant and near-constant integer columns
encoded
with DELTA_BINARY_PACKED are common in practice (timestamps with uniform
step,
run-length-like data). The bw=0 case is the hot path for those columns
and the
decode loop was doing unnecessary work.
**Verification:** Existing skip tests pass. Two new focused tests added
for the
uniform-step case (`min_delta != 0`), which was not covered by prior
tests.
The bw=0 path produces identical `last_value` state as the decode-based
path by
construction (`n * min_delta == sum of n copies of min_delta`).
Generated-by: Claude (claude-sonnet-4-6)
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/encodings/decoding.rs | 87 +++++++++++++++++++++++----------------
1 file changed, 51 insertions(+), 36 deletions(-)
diff --git a/parquet/src/encodings/decoding.rs
b/parquet/src/encodings/decoding.rs
index 5bc2cc9f11..f7f4d9be47 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -862,52 +862,50 @@ where
let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as
usize;
self.check_bit_width(bit_width)?;
let mini_block_to_skip = self.mini_block_remaining.min(to_skip -
skip);
- let mini_block_should_skip = mini_block_to_skip;
- let skip_count = self
- .bit_reader
- .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
-
- if skip_count != mini_block_to_skip {
- return Err(general_err!(
- "Expected to skip {} values from mini block got {}.",
- mini_block_batch_size,
- skip_count
- ));
- }
-
- // see commentary in self.get() above regarding optimizations
let min_delta = self.min_delta.as_i64()?;
if bit_width == 0 {
- // if min_delta == 0, there's nothing to do. self.last_value
is unchanged
+ // All remainders are zero: every delta equals min_delta
exactly.
+ // Advance last_value by n * min_delta with no bit reads.
if min_delta != 0 {
- let mut delta = self.min_delta;
- for v in &mut skip_buffer[0..skip_count] {
- *v = self.last_value.wrapping_add(&delta);
- delta = delta.wrapping_add(&self.min_delta);
- }
-
- self.last_value = skip_buffer[skip_count - 1];
- }
- } else if min_delta == 0 {
- for v in &mut skip_buffer[0..skip_count] {
- *v = v.wrapping_add(&self.last_value);
-
- self.last_value = *v;
+ let total = min_delta.wrapping_mul(mini_block_to_skip as
i64);
+ let step = T::T::from_i64(total)
+ .ok_or_else(|| general_err!("delta*n overflow in
skip"))?;
+ self.last_value = self.last_value.wrapping_add(&step);
}
+ // bit_width=0 payloads occupy zero bytes; no bit_reader
advancement needed.
} else {
- for v in &mut skip_buffer[0..skip_count] {
- *v = v
- .wrapping_add(&self.min_delta)
- .wrapping_add(&self.last_value);
+ // bw>0: must decode to track last_value for subsequent get()
calls.
+ let skip_count = self
+ .bit_reader
+ .get_batch(&mut skip_buffer[0..mini_block_to_skip],
bit_width);
+
+ if skip_count != mini_block_to_skip {
+ return Err(general_err!(
+ "Expected to skip {} values from mini block got {}.",
+ mini_block_to_skip,
+ skip_count
+ ));
+ }
- self.last_value = *v;
+ if min_delta == 0 {
+ for v in &mut skip_buffer[0..skip_count] {
+ *v = v.wrapping_add(&self.last_value);
+ self.last_value = *v;
+ }
+ } else {
+ for v in &mut skip_buffer[0..skip_count] {
+ *v = v
+ .wrapping_add(&self.min_delta)
+ .wrapping_add(&self.last_value);
+ self.last_value = *v;
+ }
}
}
- skip += mini_block_should_skip;
- self.mini_block_remaining -= mini_block_should_skip;
- self.values_left -= mini_block_should_skip;
+ skip += mini_block_to_skip;
+ self.mini_block_remaining -= mini_block_to_skip;
+ self.values_left -= mini_block_to_skip;
}
Ok(to_skip)
@@ -1690,6 +1688,23 @@ mod tests {
test_skip::<Int32Type>(block_data, Encoding::DELTA_BINARY_PACKED, 100);
}
+ #[test]
+ fn test_skip_delta_bit_packed_bw0_uniform_step_i32() {
+ // Uniform-step column: every delta equals min_delta, so bw=0
miniblocks.
+ // Partial skip must advance last_value by n * min_delta (min_delta !=
0 path).
+ let data: Vec<i32> = (0..128).map(|i| i * 7).collect();
+ test_skip::<Int32Type>(data.clone(), Encoding::DELTA_BINARY_PACKED,
50);
+ test_skip::<Int32Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
+ }
+
+ #[test]
+ fn test_skip_delta_bit_packed_bw0_uniform_step_i64() {
+ // Same as above for i64.
+ let data: Vec<i64> = (0..128).map(|i| i * 100).collect();
+ test_skip::<Int64Type>(data.clone(), Encoding::DELTA_BINARY_PACKED,
50);
+ test_skip::<Int64Type>(data, Encoding::DELTA_BINARY_PACKED, 200);
+ }
+
#[test]
fn test_delta_bit_packed_int32_multiple_blocks() {
// Test multiple 'put' calls on the same encoder