alamb commented on code in PR #7997: URL: https://github.com/apache/arrow-rs/pull/7997#discussion_r2229645049
########## parquet/src/arrow/arrow_reader/decoder.rs: ########## @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A Parquet "Push" Decoder for decoding values from a Parquet file +//! with data provided by the caller (rather than directly read from an +//! underlying reader). + +use crate::arrow::arrow_reader::ReadPlan; +use crate::errors::ParquetError; +use crate::file::metadata::ParquetMetaDataReader; +use arrow_array::RecordBatch; +use bytes::{Bytes}; +use std::collections::VecDeque; +use std::ops::Range; +use crate::file::reader::{ChunkReader, Length}; + +/// A builder for [`ParquetDecoder`]. +#[derive(Debug)] +pub struct ParquetDecoderBuilder { + file_len: u64, + // TODO optional metadata + // Configuration options for the decoder + // e.g., batch size, compression, etc. +} + +impl ParquetDecoderBuilder { + /// Create a new `ParquetDecoderBuilder` with default options + /// + /// The file length must be specified + pub fn new(file_len: u64) -> Self { + Self { file_len } + } + + /// Create the decoder with the configured options + pub fn build(self) -> Result<ParquetDecoder, ParquetError> { + let Self { file_len } = self; + // Initialize the decoder with the configured options + Ok(ParquetDecoder { + state: ParquetDecoderState::Start { file_len }, + }) + } +} + +/// A Description of what data is needed to read the next batch of data +/// from a Parquet file. +#[derive(Debug)] +pub enum DecodeResult { + /// The decoder needs more data to make progres. + NeedsData { + // TOOD distinguish between minimim needed to make progress and what could be used? + /// The ranges of data from the underlying reader that are needed + ranges: Vec<Range<u64>>, + }, + /// The decoder produced a new batch of data + Batch(RecordBatch), + /// The decoder finished processing all data requested + Finished, +} + +/// A push based Parquet Decoder +/// +/// This is a lower level interface for decoding Parquet data that does not +/// require an underlying reader and therefore offers lower level control over +/// how data is fetched and decoded. +#[derive(Debug)] +pub struct ParquetDecoder { + state: ParquetDecoderState, +} + +impl ParquetDecoder { + /// Attempt to decode the next batch of data + /// + /// If the decoder needs more data to proceed, it will return the data needed + /// + /// This will return `None` if the decoder is finished + pub fn try_decode(&mut self) -> Result<DecodeResult, ParquetError> { + // take the existing state, leaving the decoder in the Finished state + let state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished); + match state { + ParquetDecoderState::Start { file_len } => { + let Some(start_offset) = file_len.checked_sub(8) else { + return Err(ParquetError::General(format!( + "Parquet files are at least 8 bytes long, but file length is {file_len}" + ))); + }; + // The decoder is starting, it needs to read the metadata first + Ok(DecodeResult::NeedsData { + ranges: vec![start_offset..file_len], // Placeholder, actual ranges will be determined later + }) + } + ParquetDecoderState::DecodingMetadata { file_len, buffers, mut metadata_decoder } => { + match metadata_decoder.try_parse_sized(&buffers, file_len) { + Ok(_) => { + // Metadata successfully parsed, now we can proceed to decode the row groups + todo!(); + } + Err(ParquetError::NeedMoreData(needed)) => { + let needed = needed as u64; + let Some(start_offset) = file_len.checked_sub(needed) else { + return Err(ParquetError::General(format!( + "Parquet metadata reader needs at least {needed}, but file length is {file_len}" + ))); + }; + // needs this many more bytes at the end of the file + Ok(DecodeResult::NeedsData { + ranges: vec![start_offset..start_offset+needed], + }) + } + Err(e) => Err(e), // pass through other errors + } + + } + ParquetDecoderState::DecodingRowGroup { .. } => { + todo!() + } + ParquetDecoderState::Finished => Ok(DecodeResult::Finished), + } + } + + /// Push data into the decoder for processing + /// + /// This should correspond to the data ranges requested by the decoder + pub fn push_data( + &mut self, + ranges: Vec<Range<u64>>, + data: Vec<Bytes>, + ) -> Result<(), ParquetError> { + match self.state { + ParquetDecoderState::Start { file_len } => { + let buffers = Buffers { + file_len, + offset: 0, + ranges, + buffers: data, + }; + self.state = ParquetDecoderState::DecodingMetadata { + file_len, + buffers, + metadata_decoder: ParquetMetaDataReader::new(), // Initialize the metadata decoder + }; + Ok(()) + } + ParquetDecoderState::DecodingMetadata { .. } => { + todo!() + } + ParquetDecoderState::DecodingRowGroup { .. } => { + todo!() + } + ParquetDecoderState::Finished => Err(ParquetError::General( + "Cannot push data to a finished decoder".to_string(), + )), + } + } +} + +#[derive(Debug)] +enum ParquetDecoderState { + /// Starting State (reading footer) + Start { file_len: u64 }, + /// The decoder is reading the footer of the Parquet file + DecodingMetadata { + file_len: u64, + buffers: Buffers, + metadata_decoder: ParquetMetaDataReader, + }, + /// The decoder is actively decoding a RowGroup + DecodingRowGroup { + file_len: u64, + current_plan: ReadPlan, + current_row_group: usize, + + // Row groups to decode after this one + remaining_row_groups: VecDeque<ReadPlan>, + }, + /// The decoder has finished processing all data + Finished, +} + +/// Holds multiple buffers of data that have been requested by the ParquetDecoder +#[derive(Debug, Clone)] +pub struct Buffers { + /// the virtual "offset" of this buffers (added to any request) + offset: u64, + /// The total length of the file being decoded + file_len: u64, + /// The ranges of data that are available for decoding (not adjusted for offset) + ranges: Vec<Range<u64>>, + /// The buffers of data that can be used to decode the Parquet file + buffers: Vec<Bytes>, +} + +impl Buffers { + fn iter(&self) -> impl Iterator<Item = (&Range<u64>, &Bytes)> { + self.ranges.iter().zip(self.buffers.iter()) + } + + /// Specify a new offset + pub fn with_offset(mut self, offset: u64) -> Self { + self.offset = offset; + self + } +} + +impl Length for Buffers { + fn len(&self) -> u64 { + self.file_len + } +} + +impl std::io::Read for Buffers { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> { + // Find the range that contains the start offset + let mut found = false; + for (range, data) in self.iter() { + if range.start <= self.offset && range.end >= self.offset + buf.len() as u64 { + // Found the range, figure out the starting offset in the buffer + let start_offset = (self.offset - range.start) as usize; + let end_offset = start_offset + buf.len(); + let slice = data.slice(start_offset..end_offset); + buf.copy_from_slice(slice.as_ref()); + found = true; + } + } + if found { + // If we found the range, we can return the number of bytes read + // advance our offset + self.offset += buf.len() as u64; + Ok(buf.len()) + } + else { + + Err(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "No data available")) + } + } +} + +impl ChunkReader for Buffers { Review Comment: I am actually pretty pleased -- the lower APIs are kind of messy (this ChunkReader thing is 🤮 ) but I think if I properly handle it down here we'll be able to reuse almost all of the existing async reader machinery -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org