tustvold commented on a change in pull request #1082: URL: https://github.com/apache/arrow-rs/pull/1082#discussion_r785470310
########## File path: parquet/src/arrow/array_reader/byte_array.rs ########## @@ -0,0 +1,639 @@ +use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::record_reader::buffer::{RecordBuffer, TypedBuffer, ValueBuffer}; +use crate::arrow::record_reader::GenericRecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::Encoding; +use crate::column::page::PageIterator; +use crate::column::reader::decoder::{ColumnValueDecoder, ValuesWriter}; +use crate::data_type::Int32Type; +use crate::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::encodings::rle::RleDecoder; +use crate::errors::{ParquetError, Result}; +use crate::memory::ByteBufferPtr; +use crate::schema::types::ColumnDescPtr; +use arrow::array::{ + ArrayData, ArrayDataBuilder, ArrayRef, BinaryArray, LargeBinaryArray, + LargeStringArray, OffsetSizeTrait, StringArray, +}; +use arrow::buffer::Buffer; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +enum Reader { + Binary(GenericRecordReader<OffsetBuffer<i32>, ByteArrayDecoder<i32>>), + LargeBinary(GenericRecordReader<OffsetBuffer<i64>, ByteArrayDecoder<i64>>), + Utf8(GenericRecordReader<OffsetBuffer<i32>, ByteArrayDecoder<i32>>), + LargeUtf8(GenericRecordReader<OffsetBuffer<i64>, ByteArrayDecoder<i64>>), +} + +fn consume_array_data<I: OffsetSizeTrait>( + data_type: ArrowType, + reader: &mut GenericRecordReader<OffsetBuffer<I>, ByteArrayDecoder<I>>, +) -> Result<ArrayData> { + let buffer = reader.consume_record_data()?; + let mut array_data_builder = ArrayDataBuilder::new(data_type) + .len(buffer.len()) + .add_buffer(buffer.offsets.into()) + .add_buffer(buffer.values.into()); + + if let Some(buffer) = reader.consume_bitmap_buffer()? { + array_data_builder = array_data_builder.null_bit_buffer(buffer); + } + Ok(unsafe { array_data_builder.build_unchecked() }) +} + +pub struct ByteArrayReader { + data_type: ArrowType, + pages: Box<dyn PageIterator>, + def_levels_buffer: Option<Buffer>, + rep_levels_buffer: Option<Buffer>, + column_desc: ColumnDescPtr, + record_reader: Reader, +} + +impl ByteArrayReader { + /// Construct primitive array reader. + pub fn new( + pages: Box<dyn PageIterator>, + column_desc: ColumnDescPtr, + arrow_type: Option<ArrowType>, + ) -> Result<Self> { + Self::new_with_options(pages, column_desc, arrow_type, false) + } + + /// Construct primitive array reader with ability to only compute null mask and not + /// buffer level data + pub fn new_with_options( + pages: Box<dyn PageIterator>, + column_desc: ColumnDescPtr, + arrow_type: Option<ArrowType>, + null_mask_only: bool, + ) -> Result<Self> { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + let record_reader = match data_type { + ArrowType::Binary => Reader::Binary(GenericRecordReader::new_with_options( + column_desc.clone(), + null_mask_only, + )), + ArrowType::LargeBinary => { + Reader::LargeBinary(GenericRecordReader::new_with_options( + column_desc.clone(), + null_mask_only, + )) + } + ArrowType::Utf8 => Reader::Utf8(GenericRecordReader::new_with_options( + column_desc.clone(), + null_mask_only, + )), + ArrowType::LargeUtf8 => { + Reader::LargeUtf8(GenericRecordReader::new_with_options( + column_desc.clone(), + null_mask_only, + )) + } + _ => { + return Err(general_err!( + "invalid data type for ByteArrayReader - {}", + data_type + )) + } + }; + + Ok(Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + column_desc, + record_reader, + }) + } +} + +impl ArrayReader for ByteArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> crate::errors::Result<ArrayRef> { + let data = match &mut self.record_reader { + Reader::Binary(r) | Reader::Utf8(r) => { + read_records(r, self.pages.as_mut(), batch_size)?; + let data = consume_array_data(self.data_type.clone(), r)?; + self.def_levels_buffer = r.consume_def_levels()?; + self.rep_levels_buffer = r.consume_rep_levels()?; + r.reset(); + data + } + Reader::LargeBinary(r) | Reader::LargeUtf8(r) => { + read_records(r, self.pages.as_mut(), batch_size)?; + let data = consume_array_data(self.data_type.clone(), r)?; + self.def_levels_buffer = r.consume_def_levels()?; + self.rep_levels_buffer = r.consume_rep_levels()?; + r.reset(); + data + } + }; + + Ok(match &self.record_reader { + Reader::Binary(_) => Arc::new(BinaryArray::from(data)), + Reader::LargeBinary(_) => Arc::new(LargeBinaryArray::from(data)), + Reader::Utf8(_) => Arc::new(StringArray::from(data)), + Reader::LargeUtf8(_) => Arc::new(LargeStringArray::from(data)), + }) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer + .as_ref() + .map(|buf| unsafe { buf.typed_data() }) + } +} + +struct OffsetBuffer<I> { + offsets: TypedBuffer<I>, + values: TypedBuffer<u8>, +} + +impl<I> Default for OffsetBuffer<I> { + fn default() -> Self { + let mut offsets = TypedBuffer::new(); + offsets.resize(1); + Self { + offsets, + values: TypedBuffer::new(), + } + } +} + +impl<I: OffsetSizeTrait> OffsetBuffer<I> { + fn len(&self) -> usize { + self.offsets.len() - 1 + } + + fn try_push(&mut self, data: &[u8]) -> Result<()> { + self.values.extend_from_slice(data); + + let index_offset = I::from_usize(self.values.len()) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + + self.offsets.push(index_offset); + Ok(()) + } +} + +impl<I: OffsetSizeTrait> RecordBuffer for OffsetBuffer<I> { + type Output = Self; + type Writer = Self; + + fn split(&mut self, len: usize) -> Self::Output { + let remaining_offsets = self.offsets.len() - len - 1; + let offsets = self.offsets.as_slice(); + + let end_offset = offsets[len]; + + let mut new_offsets = TypedBuffer::new(); + new_offsets.reserve(remaining_offsets + 1); + for v in &offsets[len..] { + new_offsets.push(*v - end_offset) + } + + self.offsets.resize(len + 1); + + Self { + offsets: std::mem::replace(&mut self.offsets, new_offsets), + values: self.values.take(end_offset.to_usize().unwrap()), + } + } + + fn writer(&mut self, _batch_size: usize) -> &mut Self::Writer { + self + } + + fn commit(&mut self, len: usize) { + assert_eq!(self.offsets.len(), len + 1); + } +} + +impl<I: OffsetSizeTrait> ValueBuffer for OffsetBuffer<I> { + fn pad_nulls( + &mut self, + values_range: Range<usize>, + levels_range: Range<usize>, + rev_position_iter: impl Iterator<Item = usize>, + ) { + assert_eq!(self.offsets.len(), values_range.end + 1); + self.offsets.resize(levels_range.end + 1); + + let offsets = self.offsets.as_slice_mut(); + + let values_start = values_range.start; + let mut last_offset = levels_range.end + 1; + + for (value_pos, level_pos) in values_range.rev().zip(rev_position_iter) { + assert!(level_pos >= value_pos); + assert!(level_pos < last_offset); + + if level_pos == value_pos { + // Pad trailing nulls if necessary + if level_pos != last_offset && last_offset == levels_range.end + 1 { + let value = offsets[value_pos]; + for x in &mut offsets[level_pos + 1..last_offset] { + *x = value; + } + } + + // We are done + return; + } + + // Fill in any nulls + let value_end = offsets[value_pos + 1]; + let value_start = offsets[value_pos]; + + for x in &mut offsets[level_pos + 1..last_offset] { + *x = value_end; + } + + offsets[level_pos] = value_start; + last_offset = level_pos; + } + + // Pad leading nulls up to `last_offset` + let value = offsets[values_start]; + for x in &mut offsets[values_start + 1..last_offset] { + *x = value + } + } +} + +impl<I> ValuesWriter for OffsetBuffer<I> { + fn capacity(&self) -> usize { + usize::MAX + } +} + +struct ByteArrayDecoder<I> { + dict: Option<OffsetBuffer<I>>, + decoder: Option<StringDecoder>, +} + +impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayDecoder<I> { + type Writer = OffsetBuffer<I>; + + fn create(_: &ColumnDescPtr) -> Self { + Self { + dict: None, + decoder: None, + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if !matches!( + encoding, + Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY + ) { + return Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )); + } + + let mut buffer = OffsetBuffer::default(); + let mut decoder = PlainDecoder::new(buf, num_values as usize); + decoder.read(&mut buffer, usize::MAX)?; + self.dict = Some(buffer); + Ok(()) + } + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()> { + let decoder = match encoding { + Encoding::PLAIN => StringDecoder::Plain(PlainDecoder::new(data, num_values)), + Encoding::RLE_DICTIONARY => { + StringDecoder::Dictionary(DictionaryDecoder::new(data)) + } + Encoding::DELTA_LENGTH_BYTE_ARRAY => { + StringDecoder::DeltaLength(DeltaLengthDecoder::new(data, num_values)?) + } + Encoding::DELTA_BYTE_ARRAY => { + StringDecoder::DeltaStrings(DeltaStringsDecoder::new(data, num_values)?) + } + _ => { + return Err(general_err!( + "unsupported encoding for byte array: {}", + encoding + )) + } + }; + self.decoder = Some(decoder); + Ok(()) + } + + fn read(&mut self, out: &mut Self::Writer, range: Range<usize>) -> Result<usize> { + let len = range.end - range.start; + match self.decoder.as_mut().expect("decoder set") { + StringDecoder::Plain(d) => d.read(out, len), + StringDecoder::Dictionary(d) => { + let dict = self.dict.as_ref().expect("dictionary set"); + d.read(out, dict, len) + } + StringDecoder::DeltaLength(d) => d.read(out, len), + StringDecoder::DeltaStrings(d) => d.read(out, len), + } + } +} + +enum StringDecoder { + Plain(PlainDecoder), + Dictionary(DictionaryDecoder), + DeltaLength(DeltaLengthDecoder), + DeltaStrings(DeltaStringsDecoder), +} + +/// Decoder for [`Encoding::PLAIN`] +struct PlainDecoder { + buf: ByteBufferPtr, + offset: usize, + remaining_values: usize, +} + +impl PlainDecoder { + fn new(buf: ByteBufferPtr, values: usize) -> Self { + Self { + buf, + offset: 0, + remaining_values: values, + } + } + + fn read<I: OffsetSizeTrait>( + &mut self, + output: &mut OffsetBuffer<I>, + len: usize, + ) -> Result<usize> { + let to_read = len.min(self.remaining_values); + output.offsets.reserve(to_read); + + let remaining_bytes = self.buf.len() - self.offset; + if remaining_bytes == 0 { + return Ok(0); + } + + let estimated_bytes = remaining_bytes + .checked_mul(to_read) + .map(|x| x / self.remaining_values) + .unwrap_or_default(); + + output.values.reserve(estimated_bytes); + + let mut read = 0; + + let buf = self.buf.as_ref(); + while self.offset < self.buf.len() && read != to_read { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + let len_bytes: [u8; 4] = + buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes); + + let start_offset = self.offset + 4; + let end_offset = start_offset + len as usize; + if end_offset > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + output.try_push(&buf[start_offset..end_offset])?; + + self.offset = end_offset; + read += 1; + } + self.remaining_values -= to_read; + Ok(to_read) + } +} + +/// Decoder for [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] +struct DeltaLengthDecoder { + lengths: Vec<i32>, + data: ByteBufferPtr, + length_offset: usize, + data_offset: usize, +} + +impl DeltaLengthDecoder { + fn new(data: ByteBufferPtr, values: usize) -> Result<Self> { + let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new(); + len_decoder.set_data(data.all(), values)?; + let mut lengths = vec![0; values]; + len_decoder.get(&mut lengths)?; + + Ok(Self { + lengths, + data, + length_offset: 0, + data_offset: len_decoder.get_offset(), + }) + } + + fn read<I: OffsetSizeTrait>( + &mut self, + output: &mut OffsetBuffer<I>, + len: usize, + ) -> Result<usize> { + let to_read = len.min(self.lengths.len() - self.length_offset); + + output.offsets.reserve(to_read); + + let mut to_read_bytes: usize = 0; + let mut offset = output.values.len(); + + for length in &self.lengths[self.length_offset..self.length_offset + to_read] { + offset = offset.saturating_add(*length as usize); + to_read_bytes += *length as usize; + + let offset_i = I::from_usize(offset) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + output.offsets.push(offset_i) + } + + output.values.extend_from_slice( + &self.data.as_ref()[self.data_offset..self.data_offset + to_read_bytes], + ); + + self.data_offset += to_read_bytes; + self.length_offset += to_read; + Ok(to_read) + } +} + +/// Decoder for [`Encoding::DELTA_BYTE_ARRAY`] +struct DeltaStringsDecoder { + prefix_lengths: Vec<i32>, + suffix_lengths: Vec<i32>, + data: ByteBufferPtr, + length_offset: usize, + data_offset: usize, + last_value: Vec<u8>, +} + +impl DeltaStringsDecoder { + fn new(data: ByteBufferPtr, values: usize) -> Result<Self> { + let mut prefix = DeltaBitPackDecoder::<Int32Type>::new(); + prefix.set_data(data.all(), values)?; + let mut prefix_lengths = vec![0; values]; + prefix.get(&mut prefix_lengths)?; + + let mut suffix = DeltaBitPackDecoder::<Int32Type>::new(); + suffix.set_data(data.start_from(prefix.get_offset()), values)?; + let mut suffix_lengths = vec![0; values]; + suffix.get(&mut suffix_lengths)?; + + Ok(Self { + prefix_lengths, + suffix_lengths, + data, + length_offset: 0, + data_offset: prefix.get_offset() + suffix.get_offset(), + last_value: vec![], + }) + } + + fn read<I: OffsetSizeTrait>( + &mut self, + output: &mut OffsetBuffer<I>, + len: usize, + ) -> Result<usize> { + assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len()); + + let to_read = len.min(self.prefix_lengths.len() - self.length_offset); + + output.offsets.reserve(to_read); + + let length_range = self.length_offset..self.length_offset + to_read; + let iter = self.prefix_lengths[length_range.clone()] + .iter() + .zip(&self.suffix_lengths[length_range]); + + let mut offset = output.values.len(); + let data = self.data.as_ref(); + + for (prefix_length, suffix_length) in iter { + let total_length = *prefix_length as usize + *suffix_length as usize; + + if self.data_offset + total_length > self.data.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + offset = offset.saturating_add(total_length); + + let offset_i = I::from_usize(offset) + .ok_or_else(|| general_err!("index overflow decoding byte array"))?; + output.offsets.push(offset_i); + + self.last_value.truncate(*prefix_length as usize); + self.last_value.extend_from_slice( + &data[self.data_offset..self.data_offset + total_length], + ); + + output.values.reserve(total_length); + output.values.extend_from_slice(&self.last_value); + + self.data_offset += total_length; + } + + self.length_offset += to_read; + Ok(to_read) + } +} + +struct DictionaryDecoder { + decoder: RleDecoder, + index_buf: Box<[i32; 1024]>, + index_offset: usize, +} + +impl DictionaryDecoder { + fn new(data: ByteBufferPtr) -> Self { + let bit_width = data[0]; + let mut decoder = RleDecoder::new(bit_width); + decoder.set_data(data.start_from(1)); + + Self { + decoder, + index_buf: Box::new([0; 1024]), + index_offset: 1024, + } + } + + fn read<I: OffsetSizeTrait>( + &mut self, + output: &mut OffsetBuffer<I>, + dict: &OffsetBuffer<I>, + len: usize, + ) -> Result<usize> { + let mut values_read = 0; + + while values_read != len { + if self.index_offset == self.index_buf.len() { + let decoded = self.decoder.get_batch(self.index_buf.as_mut())?; + self.index_offset = 0; + if decoded != self.index_buf.len() && decoded < len - values_read { + return Err(ParquetError::EOF( Review comment: I removed this logic, and added checks higher up in RecordReader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org