This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new e7a0008e59 Implement dictionary support for reading ByteView from 
parquet (#5973)
e7a0008e59 is described below

commit e7a0008e591f0b92a8c1ba6a82744d91a95baa19
Author: Xiangpeng Hao <m...@haoxp.xyz>
AuthorDate: Wed Jul 3 06:09:45 2024 -0400

    Implement dictionary support for reading ByteView from parquet (#5973)
    
    * implement dictionary encoding support
    
    * update comments
---
 parquet/src/arrow/array_reader/byte_view_array.rs | 121 ++++++++++++++++++++--
 parquet/src/arrow/buffer/view_buffer.rs           |  13 +++
 2 files changed, 125 insertions(+), 9 deletions(-)

diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs 
b/parquet/src/arrow/array_reader/byte_view_array.rs
index 2d4e57414e..7f0a0dd2a5 100644
--- a/parquet/src/arrow/array_reader/byte_view_array.rs
+++ b/parquet/src/arrow/array_reader/byte_view_array.rs
@@ -17,6 +17,7 @@
 
 use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
 use crate::arrow::buffer::view_buffer::ViewBuffer;
+use crate::arrow::decoder::DictIndexDecoder;
 use crate::arrow::record_reader::GenericRecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
 use crate::basic::{ConvertedType, Encoding};
@@ -25,6 +26,7 @@ use crate::column::reader::decoder::ColumnValueDecoder;
 use crate::errors::{ParquetError, Result};
 use crate::schema::types::ColumnDescPtr;
 use arrow_array::ArrayRef;
+use arrow_data::ByteView;
 use arrow_schema::DataType as ArrowType;
 use bytes::Bytes;
 use std::any::Any;
@@ -210,6 +212,7 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder 
{
 /// A generic decoder from uncompressed parquet value data to [`ViewBuffer`]
 pub enum ByteViewArrayDecoder {
     Plain(ByteViewArrayDecoderPlain),
+    Dictionary(ByteViewArrayDecoderDictionary),
 }
 
 impl ByteViewArrayDecoder {
@@ -227,10 +230,14 @@ impl ByteViewArrayDecoder {
                 num_values,
                 validate_utf8,
             )),
-            Encoding::RLE_DICTIONARY
-            | Encoding::PLAIN_DICTIONARY
-            | Encoding::DELTA_LENGTH_BYTE_ARRAY
-            | Encoding::DELTA_BYTE_ARRAY => unimplemented!("stay tuned!"),
+            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
+                
ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
+                    data, num_levels, num_values,
+                ))
+            }
+            Encoding::DELTA_LENGTH_BYTE_ARRAY | Encoding::DELTA_BYTE_ARRAY => {
+                unimplemented!("stay tuned!")
+            }
             _ => {
                 return Err(general_err!(
                     "unsupported encoding for byte array: {}",
@@ -247,17 +254,27 @@ impl ByteViewArrayDecoder {
         &mut self,
         out: &mut ViewBuffer,
         len: usize,
-        _dict: Option<&ViewBuffer>,
+        dict: Option<&ViewBuffer>,
     ) -> Result<usize> {
         match self {
             ByteViewArrayDecoder::Plain(d) => d.read(out, len),
+            ByteViewArrayDecoder::Dictionary(d) => {
+                let dict = dict
+                    .ok_or_else(|| general_err!("dictionary required for 
dictionary encoding"))?;
+                d.read(out, dict, len)
+            }
         }
     }
 
     /// Skip `len` values
-    pub fn skip(&mut self, len: usize, _dict: Option<&ViewBuffer>) -> 
Result<usize> {
+    pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> 
Result<usize> {
         match self {
             ByteViewArrayDecoder::Plain(d) => d.skip(len),
+            ByteViewArrayDecoder::Dictionary(d) => {
+                let dict = dict
+                    .ok_or_else(|| general_err!("dictionary required for 
dictionary encoding"))?;
+                d.skip(dict, len)
+            }
         }
     }
 }
@@ -348,6 +365,90 @@ impl ByteViewArrayDecoderPlain {
     }
 }
 
+pub struct ByteViewArrayDecoderDictionary {
+    decoder: DictIndexDecoder,
+}
+
+impl ByteViewArrayDecoderDictionary {
+    fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
+        Self {
+            decoder: DictIndexDecoder::new(data, num_levels, num_values),
+        }
+    }
+
+    /// Reads the next indexes from self.decoder
+    /// the indexes are assumed to be indexes into `dict`
+    /// the output values are written to output
+    ///
+    /// Assumptions / Optimization
+    /// This function checks if dict.buffers() are the last buffers in 
`output`, and if so
+    /// reuses the dictionary page buffers directly without copying data
+    fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) 
-> Result<usize> {
+        if dict.is_empty() || len == 0 {
+            return Ok(0);
+        }
+
+        // Check if the last few buffer of `output`` are the same as the 
`dict` buffer
+        // This is to avoid creating a new buffers if the same dictionary is 
used for multiple `read`
+        let need_to_create_new_buffer = {
+            if output.buffers.len() >= dict.buffers.len() {
+                let offset = output.buffers.len() - dict.buffers.len();
+                output.buffers[offset..]
+                    .iter()
+                    .zip(dict.buffers.iter())
+                    .any(|(a, b)| !a.ptr_eq(b))
+            } else {
+                true
+            }
+        };
+
+        if need_to_create_new_buffer {
+            for b in dict.buffers.iter() {
+                output.buffers.push(b.clone());
+            }
+        }
+
+        // Calculate the offset of the dictionary buffers in the output buffers
+        // For example if the 2nd buffer in the dictionary is the 5th buffer 
in the output buffers,
+        // then the base_buffer_idx is 5 - 2 = 3
+        let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() 
as u32;
+
+        self.decoder.read(len, |keys| {
+            for k in keys {
+                let view = dict
+                    .views
+                    .get(*k as usize)
+                    .ok_or_else(|| general_err!("invalid key={} for 
dictionary", *k))?;
+                let len = *view as u32;
+                if len <= 12 {
+                    // directly append the view if it is inlined
+                    // Safety: the view is from the dictionary, so it is valid
+                    unsafe {
+                        output.append_raw_view_unchecked(view);
+                    }
+                } else {
+                    // correct the buffer index and append the view
+                    let mut view = ByteView::from(*view);
+                    view.buffer_index += base_buffer_idx;
+                    // Safety: the view is from the dictionary,
+                    // we corrected the index value to point it to output 
buffer, so it is valid
+                    unsafe {
+                        output.append_raw_view_unchecked(&view.into());
+                    }
+                }
+            }
+            Ok(())
+        })
+    }
+
+    fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
+        if dict.is_empty() {
+            return Ok(0);
+        }
+        self.decoder.skip(to_skip)
+    }
+}
+
 /// Check that `val` is a valid UTF-8 sequence
 pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
     match std::str::from_utf8(val) {
@@ -386,8 +487,11 @@ mod tests {
             .unwrap();
 
         for (encoding, page) in pages {
-            if encoding != Encoding::PLAIN {
-                // skip non-plain encodings for now as they are not yet 
implemented
+            if encoding != Encoding::PLAIN
+                && encoding != Encoding::RLE_DICTIONARY
+                && encoding != Encoding::PLAIN_DICTIONARY
+            {
+                // skip unsupported encodings for now as they are not yet 
implemented
                 continue;
             }
             let mut output = ViewBuffer::default();
@@ -399,7 +503,6 @@ mod tests {
             assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
 
             assert_eq!(output.views.len(), 4);
-            assert_eq!(output.buffers.len(), 4);
 
             let valid = [false, false, true, true, false, true, true, false, 
false];
             let valid_buffer = Buffer::from_iter(valid.iter().cloned());
diff --git a/parquet/src/arrow/buffer/view_buffer.rs 
b/parquet/src/arrow/buffer/view_buffer.rs
index 01e7c4aad3..1651aa2d75 100644
--- a/parquet/src/arrow/buffer/view_buffer.rs
+++ b/parquet/src/arrow/buffer/view_buffer.rs
@@ -33,6 +33,10 @@ pub struct ViewBuffer {
 }
 
 impl ViewBuffer {
+    pub fn is_empty(&self) -> bool {
+        self.views.is_empty()
+    }
+
     #[allow(unused)]
     pub fn append_block(&mut self, block: Buffer) -> u32 {
         let block_id = self.buffers.len() as u32;
@@ -56,6 +60,15 @@ impl ViewBuffer {
         self.views.push(view);
     }
 
+    /// Directly append a view to the view array.
+    /// This is used when we create a StringViewArray from a dictionary whose 
values are StringViewArray.
+    ///
+    /// # Safety
+    /// The `view` must be a valid view as per the ByteView spec.
+    pub unsafe fn append_raw_view_unchecked(&mut self, view: &u128) {
+        self.views.push(*view);
+    }
+
     /// Converts this into an [`ArrayRef`] with the provided `data_type` and 
`null_buffer`
     #[allow(unused)]
     pub fn into_array(self, null_buffer: Option<Buffer>, data_type: 
&ArrowType) -> ArrayRef {

Reply via email to