This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d11b388a0 Implement Skip for DeltaBitPackDecoder (#2393)
d11b388a0 is described below

commit d11b388a0dfad3cfe0ca30b6a26fff0d88006ca0
Author: Yang Jiang <[email protected]>
AuthorDate: Fri Aug 12 16:26:02 2022 +0800

    Implement Skip for DeltaBitPackDecoder (#2393)
    
    * Implement Skip for DeltaBitPackDecoder
    
    * move check out of loop
    
    * add bench
    
    * change to use batch read.
---
 parquet/benches/arrow_reader.rs   | 53 ++++++++++++++++++++++++++++++++++++
 parquet/src/encodings/decoding.rs | 57 +++++++++++++++++++++++++++++++++++++--
 2 files changed, 108 insertions(+), 2 deletions(-)

diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs
index dc2ed8355..a3c904505 100644
--- a/parquet/benches/arrow_reader.rs
+++ b/parquet/benches/arrow_reader.rs
@@ -300,6 +300,26 @@ fn bench_array_reader(mut array_reader: Box<dyn 
ArrayReader>) -> usize {
     total_count
 }
 
+fn bench_array_reader_skip(mut array_reader: Box<dyn ArrayReader>) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    let mut skip = false;
+    let mut array_len;
+    loop {
+        if skip {
+            array_len = array_reader.skip_records(BATCH_SIZE).unwrap();
+        } else {
+            let array = array_reader.next_batch(BATCH_SIZE);
+            array_len = array.unwrap().len();
+        }
+        total_count += array_len;
+        skip = !skip;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
 fn create_primitive_array_reader(
     page_iterator: impl PageIterator + 'static,
     column_desc: ColumnDescPtr,
@@ -445,6 +465,39 @@ fn bench_primitive<T>(
         assert_eq!(count, EXPECTED_VALUE_COUNT);
     });
 
+    // binary packed skip , no NULLs
+    let data = build_encoded_primitive_page_iterator::<T>(
+        schema.clone(),
+        mandatory_column_desc.clone(),
+        0.0,
+        Encoding::DELTA_BINARY_PACKED,
+    );
+    group.bench_function("binary packed skip, mandatory, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader = create_primitive_array_reader(
+                data.clone(),
+                mandatory_column_desc.clone(),
+            );
+            count = bench_array_reader_skip(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
+    let data = build_encoded_primitive_page_iterator::<T>(
+        schema.clone(),
+        optional_column_desc.clone(),
+        0.0,
+        Encoding::DELTA_BINARY_PACKED,
+    );
+    group.bench_function("binary packed skip, optional, no NULLs", |b| {
+        b.iter(|| {
+            let array_reader =
+                create_primitive_array_reader(data.clone(), 
optional_column_desc.clone());
+            count = bench_array_reader_skip(array_reader);
+        });
+        assert_eq!(count, EXPECTED_VALUE_COUNT);
+    });
+
     // binary packed, half NULLs
     let data = build_encoded_primitive_page_iterator::<T>(
         schema.clone(),
diff --git a/parquet/src/encodings/decoding.rs 
b/parquet/src/encodings/decoding.rs
index bb1e7137a..86941ffe0 100644
--- a/parquet/src/encodings/decoding.rs
+++ b/parquet/src/encodings/decoding.rs
@@ -736,8 +736,61 @@ where
     }
 
     fn skip(&mut self, num_values: usize) -> Result<usize> {
-        let mut buffer = vec![T::T::default(); num_values];
-        self.get(&mut buffer)
+        let mut skip = 0;
+        let to_skip = num_values.min(self.values_left);
+        if to_skip == 0 {
+            return Ok(0);
+        }
+
+        // try to consume first value in header.
+        if let Some(value) = self.first_value.take() {
+            self.last_value = value;
+            skip += 1;
+            self.values_left -= 1;
+        }
+
+        let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
+            Type::INT32 => 32,
+            Type::INT64 => 64,
+            _ => unreachable!(),
+        };
+
+        let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
+        while skip < to_skip {
+            if self.mini_block_remaining == 0 {
+                self.next_mini_block()?;
+            }
+
+            let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as 
usize;
+            let mini_block_to_skip = self.mini_block_remaining.min(to_skip - 
skip);
+            let mini_block_should_skip = mini_block_to_skip;
+
+            let skip_count = self
+                .bit_reader
+                .get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);
+
+            if skip_count != mini_block_to_skip {
+                return Err(general_err!(
+                    "Expected to skip {} values from mini block got {}.",
+                    mini_block_batch_size,
+                    skip_count
+                ));
+            }
+
+            for v in &mut skip_buffer[0..skip_count] {
+                *v = v
+                    .wrapping_add(&self.min_delta)
+                    .wrapping_add(&self.last_value);
+
+                self.last_value = *v;
+            }
+
+            skip += mini_block_should_skip;
+            self.mini_block_remaining -= mini_block_should_skip;
+            self.values_left -= mini_block_should_skip;
+        }
+
+        Ok(to_skip)
     }
 }
 

Reply via email to