This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new aa626e12de [Parquet] Add ParquetMetadataPushDecoder (#8080)
aa626e12de is described below
commit aa626e12de8bc0d0f56b5349239cae1be8d1a195
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Sep 11 14:08:37 2025 -0700
[Parquet] Add ParquetMetadataPushDecoder (#8080)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/8000
- Closes https://github.com/apache/arrow-rs/issues/8164
# Rationale for this change
Metadata is needed when implementing a push decoder for Parquet:
- https://github.com/apache/arrow-rs/issues/7983
If we want to truly separate IO and CPU we also need a way to decode the
metadata without explicit IO, and hence this PR that provides a way to
decode metadata "push style" where it tells you what bytes are needed.
It follows the same API as the parquet push decoder
This PR also introduces some of the common infrastructure needed in the
parquet push decoder
# What changes are included in this PR?
1. Add `PushBuffers` to hold byte ranges
2. Add `DecodeResult` to communicate back to the caller
3. Add `ParquetMetaDataPushDecoder` for decoding metadata
# Are these changes tested?
Yes, there are several fully working doc tests that show how to use this
API
# Are there any user-facing changes?
There is a new API
---------
Co-authored-by: Ed Seidl <[email protected]>
Co-authored-by: albertlockett <[email protected]>
---
parquet/src/errors.rs | 6 +
parquet/src/file/metadata/mod.rs | 7 +-
parquet/src/file/metadata/push_decoder.rs | 559 ++++++++++++++++++++++++++++++
parquet/src/file/metadata/reader.rs | 2 +-
parquet/src/lib.rs | 17 +
parquet/src/util/mod.rs | 1 +
parquet/src/util/push_buffers.rs | 197 +++++++++++
7 files changed, 785 insertions(+), 4 deletions(-)
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index 93b2c1b7e0..be08245e95 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -52,6 +52,9 @@ pub enum ParquetError {
/// Returned when a function needs more data to complete properly. The
`usize` field indicates
/// the total number of bytes required, not the number of additional bytes.
NeedMoreData(usize),
+ /// Returned when a function needs more data to complete properly.
+ /// The `Range<u64>` indicates the range of bytes that are needed.
+ NeedMoreDataRange(std::ops::Range<u64>),
}
impl std::fmt::Display for ParquetError {
@@ -69,6 +72,9 @@ impl std::fmt::Display for ParquetError {
}
ParquetError::External(e) => write!(fmt, "External: {e}"),
ParquetError::NeedMoreData(needed) => write!(fmt, "NeedMoreData:
{needed}"),
+ ParquetError::NeedMoreDataRange(range) => {
+ write!(fmt, "NeedMoreDataRange: {}..{}", range.start,
range.end)
+ }
}
}
}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index c331988092..f90143104c 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -40,11 +40,10 @@
//! metadata into parquet files. To work with metadata directly,
//! the following APIs are available:
//!
-//! * [`ParquetMetaDataReader`] for reading
+//! * [`ParquetMetaDataReader`] for reading from a reader for I/O
+//! * [`ParquetMetaDataPushDecoder`] for decoding from bytes without I/O
//! * [`ParquetMetaDataWriter`] for writing.
//!
-//! [`ParquetMetaDataReader`]:
https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html
-//! [`ParquetMetaDataWriter`]:
https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataWriter.html
//!
//! # Examples
//!
@@ -92,6 +91,7 @@
//! * Same name, different struct
//! ```
mod memory;
+mod push_decoder;
pub(crate) mod reader;
mod writer;
@@ -120,6 +120,7 @@ use crate::schema::types::{
};
#[cfg(feature = "encryption")]
use crate::thrift::{TCompactSliceInputProtocol, TSerializable};
+pub use push_decoder::ParquetMetaDataPushDecoder;
pub use reader::{FooterTail, PageIndexPolicy, ParquetMetaDataReader};
use std::ops::Range;
use std::sync::Arc;
diff --git a/parquet/src/file/metadata/push_decoder.rs
b/parquet/src/file/metadata/push_decoder.rs
new file mode 100644
index 0000000000..811caf4fd4
--- /dev/null
+++ b/parquet/src/file/metadata/push_decoder.rs
@@ -0,0 +1,559 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+use crate::file::metadata::{PageIndexPolicy, ParquetMetaData,
ParquetMetaDataReader};
+use crate::DecodeResult;
+
+/// A push decoder for [`ParquetMetaData`].
+///
+/// This structure implements a push API based version of the
[`ParquetMetaDataReader`], which
+/// decouples the IO from the metadata decoding logic.
+///
+/// You can use this decoder to customize your IO operations, as shown in the
+/// examples below for minimizing bytes read, prefetching data, or
+/// using async IO.
+///
+/// # Example
+///
+/// The most basic usage is to feed the decoder with the necessary byte ranges
+/// as requested as shown below.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData,
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// # let mut buffer = vec![0];
+/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(),
None).unwrap();
+/// # writer.write(&batch).unwrap();
+/// # writer.close().unwrap();
+/// # Bytes::from(buffer)
+/// # };
+/// # // mimic IO by returning a function that returns the bytes for a given
range
+/// # let get_range = |range: &Range<u64>| -> Bytes {
+/// # let start = range.start as usize;
+/// # let end = range.end as usize;
+/// # file_bytes.slice(start..end)
+/// # };
+/// #
+/// # let file_len = file_bytes.len() as u64;
+/// // The `ParquetMetaDataPushDecoder` needs to know the file length.
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // try to decode the metadata. If more data is needed, the decoder will
tell you what ranges
+/// loop {
+/// match decoder.try_decode() {
+/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } //
decode successful
+/// Ok(DecodeResult::NeedsData(ranges)) => {
+/// // The decoder needs more data
+/// //
+/// // In this example, we call a function that returns the bytes
for each given range.
+/// // In a real application, you would likely read the data from a
file or network.
+/// let data = ranges.iter().map(|range| get_range(range)).collect();
+/// // Push the data into the decoder and try to decode again on the
next iteration.
+/// decoder.push_ranges(ranges, data).unwrap();
+/// }
+/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in
previous match arm") }
+/// Err(e) => return Err(e),
+/// }
+/// }
+/// # }
+/// ```
+///
+/// # Example with "prefetching"
+///
+/// By default, the [`ParquetMetaDataPushDecoder`] will request only the exact
byte
+/// ranges it needs. This minimizes the number of bytes read, however it
+/// requires at least two IO operations to read the metadata - one to read the
+/// footer and then one to read the metadata.
+///
+/// If the file has a "Page Index" (see [Self::with_page_index_policy]), three
+/// IO operations are required to read the metadata, as the page index is
+/// not part of the normal metadata footer.
+///
+/// To reduce the number of IO operations in systems with high per operation
+/// overhead (e.g. cloud storage), you can "prefetch" the data and then push
+/// the data into the decoder before calling [`Self::try_decode`]. If you do
+/// not push enough bytes, the decoder will return the ranges that are still
+/// needed.
+///
+/// This approach can also be used when you have the entire file already in
memory
+/// for other reasons.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData,
ParquetMetaDataPushDecoder};
+/// #
+/// # fn decode_metadata() -> Result<ParquetMetaData, ParquetError> {
+/// # let file_bytes = {
+/// # let mut buffer = vec![0];
+/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
+/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(),
None).unwrap();
+/// # writer.write(&batch).unwrap();
+/// # writer.close().unwrap();
+/// # Bytes::from(buffer)
+/// # };
+/// #
+/// let file_len = file_bytes.len() as u64;
+/// // For this example, we "prefetch" all the bytes which we have in memory,
+/// // but in a real application, you would likely read a chunk from the end
+/// // for example 1MB.
+/// let prefetched_bytes = file_bytes.clone();
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// // push the prefetched bytes into the decoder
+/// decoder.push_ranges(vec![0..file_len], vec![prefetched_bytes]).unwrap();
+/// // The decoder will now be able to decode the metadata. Note in a real
application,
+/// // unless you can guarantee that the pushed data is enough to decode the
metadata,
+/// // you still need to call `try_decode` in a loop until it returns
`DecodeResult::Data`
+/// // as shown in the previous example
+/// match decoder.try_decode() {
+/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } //
decode successful
+/// other => { panic!("expected DecodeResult::Data, got: {other:?}") }
+/// }
+/// # }
+/// ```
+///
+/// # Example using [`AsyncRead`]
+///
+/// [`ParquetMetaDataPushDecoder`] is designed to work with any data source
that can
+/// provide byte ranges, including async IO sources. However, it does not
+/// implement async IO itself. To use async IO, you simply write an async
+/// wrapper around it that reads the required byte ranges and pushes them into
the
+/// decoder.
+///
+/// ```rust
+/// # use std::ops::Range;
+/// # use bytes::Bytes;
+/// use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
+/// # use arrow_array::record_batch;
+/// # use parquet::DecodeResult;
+/// # use parquet::arrow::ArrowWriter;
+/// # use parquet::errors::ParquetError;
+/// # use parquet::file::metadata::{ParquetMetaData,
ParquetMetaDataPushDecoder};
+/// #
+/// // This function decodes Parquet Metadata from anything that implements
+/// // [`AsyncRead`] and [`AsyncSeek`] such as a tokio::fs::File
+/// async fn decode_metadata(
+/// file_len: u64,
+/// mut async_source: impl AsyncRead + AsyncSeek + Unpin
+/// ) -> Result<ParquetMetaData, ParquetError> {
+/// // We need a ParquetMetaDataPushDecoder to decode the metadata.
+/// let mut decoder = ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+/// loop {
+/// match decoder.try_decode() {
+/// Ok(DecodeResult::Data(metadata)) => { return Ok(metadata); } //
decode successful
+/// Ok(DecodeResult::NeedsData(ranges)) => {
+/// // The decoder needs more data
+/// //
+/// // In this example we use the AsyncRead and AsyncSeek traits to
read the
+/// // required ranges from the async source.
+/// let mut data = Vec::with_capacity(ranges.len());
+/// for range in &ranges {
+/// let mut buffer = vec![0; (range.end - range.start) as usize];
+///
async_source.seek(std::io::SeekFrom::Start(range.start)).await?;
+/// async_source.read_exact(&mut buffer).await?;
+/// data.push(Bytes::from(buffer));
+/// }
+/// // Push the data into the decoder and try to decode again on the
next iteration.
+/// decoder.push_ranges(ranges, data).unwrap();
+/// }
+/// Ok(DecodeResult::Finished) => { unreachable!("returned metadata in
previous match arm") }
+/// Err(e) => return Err(e),
+/// }
+/// }
+/// }
+/// ```
+/// [`AsyncRead`]: tokio::io::AsyncRead
+#[derive(Debug)]
+pub struct ParquetMetaDataPushDecoder {
+ done: bool,
+ metadata_reader: ParquetMetaDataReader,
+ buffers: crate::util::push_buffers::PushBuffers,
+}
+
+impl ParquetMetaDataPushDecoder {
+ /// Create a new `ParquetMetaDataPushDecoder` with the given file length.
+ ///
+ /// By default, this will read page indexes and column indexes. See
+ /// [`ParquetMetaDataPushDecoder::with_page_index_policy`] for more detail.
+ ///
+ /// See examples on [`ParquetMetaDataPushDecoder`].
+ pub fn try_new(file_len: u64) -> Result<Self, ParquetError> {
+ if file_len < 8 {
+ return Err(ParquetError::General(format!(
+ "Parquet files are at least 8 bytes long, but file length is
{file_len}"
+ )));
+ };
+
+ let metadata_reader =
+
ParquetMetaDataReader::new().with_page_index_policy(PageIndexPolicy::Optional);
+
+ Ok(Self {
+ done: false,
+ metadata_reader,
+ buffers: crate::util::push_buffers::PushBuffers::new(file_len),
+ })
+ }
+
+ /// Enable or disable reading the page index structures described in
+ /// "[Parquet page index] Layout to Support Page Skipping".
+ ///
+ /// Defaults to [`PageIndexPolicy::Optional`]
+ ///
+ /// This requires
+ /// 1. The Parquet file to have been written with page indexes
+ /// 2. Additional data to be pushed into the decoder (as the page indexes
are not part of the thrift footer)
+ ///
+ /// [Parquet page index]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
+ pub fn with_page_index_policy(mut self, page_index_policy:
PageIndexPolicy) -> Self {
+ self.metadata_reader = self
+ .metadata_reader
+ .with_page_index_policy(page_index_policy);
+ self
+ }
+
+ /// Push the data into the decoder's buffer.
+ ///
+ /// The decoder does not immediately attempt to decode the metadata
+ /// after pushing data. Instead, it accumulates the pushed data until you
+ /// call [`Self::try_decode`].
+ ///
+ /// # Determining required data:
+ ///
+ /// To determine what ranges are required to decode the metadata, you can
+ /// either:
+ ///
+ /// 1. Call [`Self::try_decode`] first to get the exact ranges required
(see
+ /// example on [`Self`])
+ ///
+ /// 2. Speculatively push any data that you have available, which may
+ /// include more than the footer data or requested bytes.
+ ///
+ /// Speculatively pushing data can be used when "prefetching" data. See
+ /// example on [`Self`]
+ pub fn push_ranges(
+ &mut self,
+ ranges: Vec<std::ops::Range<u64>>,
+ buffers: Vec<bytes::Bytes>,
+ ) -> std::result::Result<(), String> {
+ if self.done {
+ return Err(
+ "ParquetMetaDataPushDecoder: cannot push data after decoding
is finished"
+ .to_string(),
+ );
+ }
+ self.buffers.push_ranges(ranges, buffers);
+ Ok(())
+ }
+
+ /// Try to decode the metadata from the pushed data, returning the
+ /// decoded metadata or an error if not enough data is available.
+ pub fn try_decode(
+ &mut self,
+ ) -> std::result::Result<DecodeResult<ParquetMetaData>, ParquetError> {
+ if self.done {
+ return Ok(DecodeResult::Finished);
+ }
+
+ // need to have the last 8 bytes of the file to decode the metadata
+ let file_len = self.buffers.file_len();
+ if !self.buffers.has_range(&(file_len - 8..file_len)) {
+ #[expect(clippy::single_range_in_vec_init)]
+ return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len]));
+ }
+
+ // Try to parse the metadata from the buffers we have.
+ //
+ // If we don't have enough data, returns a `ParquetError::NeedMoreData`
+ // with the number of bytes needed to complete the metadata parsing.
+ //
+ // If we have enough data, returns `Ok(())` and we can complete
+ // the metadata parsing.
+ let maybe_metadata = self
+ .metadata_reader
+ .try_parse_sized(&self.buffers, self.buffers.file_len());
+
+ match maybe_metadata {
+ Ok(()) => {
+ // Metadata successfully parsed, proceed to decode the row
groups
+ let metadata = self.metadata_reader.finish()?;
+ self.done = true;
+ Ok(DecodeResult::Data(metadata))
+ }
+
+ Err(ParquetError::NeedMoreData(needed)) => {
+ let needed = needed as u64;
+ let Some(start_offset) = file_len.checked_sub(needed) else {
+ return Err(ParquetError::General(format!(
+ "Parquet metadata reader needs at least {needed}
bytes, but file length is only {file_len}"
+ )));
+ };
+ let needed_range = start_offset..start_offset + needed;
+ // needs `needed_range` bytes at the end of the file
+ Ok(DecodeResult::NeedsData(vec![needed_range]))
+ }
+ Err(ParquetError::NeedMoreDataRange(range)) =>
Ok(DecodeResult::NeedsData(vec![range])),
+
+ Err(e) => Err(e), // some other error, pass back
+ }
+ }
+}
+
+// These tests use the arrow writer to create a parquet file in memory
+// so they need the arrow feature and the test feature
+#[cfg(all(test, feature = "arrow"))]
+mod tests {
+ use super::*;
+ use crate::arrow::ArrowWriter;
+ use crate::file::properties::WriterProperties;
+ use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
+ use bytes::Bytes;
+ use std::fmt::Debug;
+ use std::ops::Range;
+ use std::sync::{Arc, LazyLock};
+
+ /// It is possible to decode the metadata from the entire file at once
before being asked
+ #[test]
+ fn test_metadata_decoder_all_data() {
+ let file_len = test_file_len();
+ let mut metadata_decoder =
ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+ // Push the entire file data into the metadata decoder
+ push_ranges_to_metadata_decoder(&mut metadata_decoder,
vec![test_file_range()]);
+
+ // should be able to decode the metadata without needing more data
+ let metadata = expect_data(metadata_decoder.try_decode());
+
+ assert_eq!(metadata.num_row_groups(), 2);
+ assert_eq!(metadata.row_group(0).num_rows(), 200);
+ assert_eq!(metadata.row_group(1).num_rows(), 200);
+ assert!(metadata.column_index().is_some());
+ assert!(metadata.offset_index().is_some());
+ }
+
+ /// It is possible to feed some, but not all, of the footer into the
metadata decoder
+ /// before asked. This avoids multiple IO requests
+ #[test]
+ fn test_metadata_decoder_prefetch_success() {
+ let file_len = test_file_len();
+ let mut metadata_decoder =
ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+ // simulate pre-fetching the last 2k bytes of the file without asking
the decoder
+ let prefetch_range = (file_len - 2 * 1024)..file_len;
+ push_ranges_to_metadata_decoder(&mut metadata_decoder,
vec![prefetch_range]);
+
+ // expect the decoder has enough data to decode the metadata
+ let metadata = expect_data(metadata_decoder.try_decode());
+ expect_finished(metadata_decoder.try_decode());
+ assert_eq!(metadata.num_row_groups(), 2);
+ assert_eq!(metadata.row_group(0).num_rows(), 200);
+ assert_eq!(metadata.row_group(1).num_rows(), 200);
+ assert!(metadata.column_index().is_some());
+ assert!(metadata.offset_index().is_some());
+ }
+
+ /// It is possible to pre-fetch some, but not all, of the necessary data
+ /// data
+ #[test]
+ fn test_metadata_decoder_prefetch_retry() {
+ let file_len = test_file_len();
+ let mut metadata_decoder =
ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+ // simulate pre-fetching the last 1500 bytes of the file.
+ // this is enough to read the footer thrift metadata, but not the
offset indexes
+ let prefetch_range = (file_len - 1500)..file_len;
+ push_ranges_to_metadata_decoder(&mut metadata_decoder,
vec![prefetch_range]);
+
+ // expect another request is needed to read the offset indexes (note
+ // try_decode only returns NeedsData once, whereas without any
prefetching it would
+ // return NeedsData three times)
+ let ranges = expect_needs_data(metadata_decoder.try_decode());
+ push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
+
+ // expect the decoder has enough data to decode the metadata
+ let metadata = expect_data(metadata_decoder.try_decode());
+ expect_finished(metadata_decoder.try_decode());
+
+ assert_eq!(metadata.num_row_groups(), 2);
+ assert_eq!(metadata.row_group(0).num_rows(), 200);
+ assert_eq!(metadata.row_group(1).num_rows(), 200);
+ assert!(metadata.column_index().is_some());
+ assert!(metadata.offset_index().is_some());
+ }
+
+ /// Decode the metadata incrementally, simulating a scenario where exactly
the data needed
+ /// is read in each step
+ #[test]
+ fn test_metadata_decoder_incremental() {
+ let file_len = TEST_FILE_DATA.len() as u64;
+ let mut metadata_decoder =
ParquetMetaDataPushDecoder::try_new(file_len).unwrap();
+ let ranges = expect_needs_data(metadata_decoder.try_decode());
+ assert_eq!(ranges.len(), 1);
+ assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
+ push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
+
+ // expect the first request to read the footer
+ let ranges = expect_needs_data(metadata_decoder.try_decode());
+ push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
+
+ // expect the second request to read the offset indexes
+ let ranges = expect_needs_data(metadata_decoder.try_decode());
+ push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
+
+ // expect the third request to read the actual data
+ let metadata = expect_data(metadata_decoder.try_decode());
+ expect_finished(metadata_decoder.try_decode());
+
+ assert_eq!(metadata.num_row_groups(), 2);
+ assert_eq!(metadata.row_group(0).num_rows(), 200);
+ assert_eq!(metadata.row_group(1).num_rows(), 200);
+ assert!(metadata.column_index().is_some());
+ assert!(metadata.offset_index().is_some());
+ }
+
+ /// Decode the metadata incrementally, but without reading the page indexes
+ /// (so only two requests)
+ #[test]
+ fn test_metadata_decoder_incremental_no_page_index() {
+ let file_len = TEST_FILE_DATA.len() as u64;
+ let mut metadata_decoder =
ParquetMetaDataPushDecoder::try_new(file_len)
+ .unwrap()
+ .with_page_index_policy(PageIndexPolicy::Skip);
+ let ranges = expect_needs_data(metadata_decoder.try_decode());
+ assert_eq!(ranges.len(), 1);
+ assert_eq!(ranges[0], test_file_len() - 8..test_file_len());
+ push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
+
+ // expect the first request to read the footer
+ let ranges = expect_needs_data(metadata_decoder.try_decode());
+ push_ranges_to_metadata_decoder(&mut metadata_decoder, ranges);
+
+ // expect NO second request to read the offset indexes, should just
cough up the metadata
+ let metadata = expect_data(metadata_decoder.try_decode());
+ expect_finished(metadata_decoder.try_decode());
+
+ assert_eq!(metadata.num_row_groups(), 2);
+ assert_eq!(metadata.row_group(0).num_rows(), 200);
+ assert_eq!(metadata.row_group(1).num_rows(), 200);
+ assert!(metadata.column_index().is_none()); // of course, we did not
read the column index
+ assert!(metadata.offset_index().is_none()); // or the offset index
+ }
+
+ static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
+ // Input batch has 400 rows, with 3 columns: "a", "b", "c"
+ // Note c is a different types (so the data page sizes will be
different)
+ let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
+ let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
+ let c: ArrayRef =
Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
+ if i % 2 == 0 {
+ format!("string_{i}")
+ } else {
+ format!("A string larger than 12 bytes and thus not inlined
{i}")
+ }
+ })));
+
+ RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
+ });
+
+ /// Create a parquet file in memory for testing. See [`test_file_range`]
for details.
+ static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
+ let input_batch = &TEST_BATCH;
+ let mut output = Vec::new();
+
+ let writer_options = WriterProperties::builder()
+ .set_max_row_group_size(200)
+ .set_data_page_row_count_limit(100)
+ .build();
+ let mut writer =
+ ArrowWriter::try_new(&mut output, input_batch.schema(),
Some(writer_options)).unwrap();
+
+ // since the limits are only enforced on batch boundaries, write the
input
+ // batch in chunks of 50
+ let mut row_remain = input_batch.num_rows();
+ while row_remain > 0 {
+ let chunk_size = row_remain.min(50);
+ let chunk = input_batch.slice(input_batch.num_rows() - row_remain,
chunk_size);
+ writer.write(&chunk).unwrap();
+ row_remain -= chunk_size;
+ }
+ writer.close().unwrap();
+ Bytes::from(output)
+ });
+
+ /// Return the length of the test file in bytes
+ fn test_file_len() -> u64 {
+ TEST_FILE_DATA.len() as u64
+ }
+
+ /// Return the range of the entire test file
+ fn test_file_range() -> Range<u64> {
+ 0..test_file_len()
+ }
+
+ /// Return a slice of the test file data from the given range
+ pub fn test_file_slice(range: Range<u64>) -> Bytes {
+ let start: usize = range.start.try_into().unwrap();
+ let end: usize = range.end.try_into().unwrap();
+ TEST_FILE_DATA.slice(start..end)
+ }
+
+ /// Push the given ranges to the metadata decoder, simulating reading from
a file
+ fn push_ranges_to_metadata_decoder(
+ metadata_decoder: &mut ParquetMetaDataPushDecoder,
+ ranges: Vec<Range<u64>>,
+ ) {
+ let data = ranges
+ .iter()
+ .map(|range| test_file_slice(range.clone()))
+ .collect::<Vec<_>>();
+ metadata_decoder.push_ranges(ranges, data).unwrap();
+ }
+
+ /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and
return the corresponding element
+ fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) ->
T {
+ match result.expect("Expected Ok(DecodeResult::Data(T))") {
+ DecodeResult::Data(data) => data,
+ result => panic!("Expected DecodeResult::Data, got {result:?}"),
+ }
+ }
+
+ /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and
return the corresponding ranges
+ fn expect_needs_data<T: Debug>(
+ result: Result<DecodeResult<T>, ParquetError>,
+ ) -> Vec<Range<u64>> {
+ match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
+ DecodeResult::NeedsData(ranges) => ranges,
+ result => panic!("Expected DecodeResult::NeedsData, got
{result:?}"),
+ }
+ }
+
+ fn expect_finished<T: Debug>(result: Result<DecodeResult<T>,
ParquetError>) {
+ match result.expect("Expected Ok(DecodeResult::Finished)") {
+ DecodeResult::Finished => {}
+ result => panic!("Expected DecodeResult::Finished, got
{result:?}"),
+ }
+ }
+}
diff --git a/parquet/src/file/metadata/reader.rs
b/parquet/src/file/metadata/reader.rs
index 4b97b5fc55..8d92d1e0aa 100644
--- a/parquet/src/file/metadata/reader.rs
+++ b/parquet/src/file/metadata/reader.rs
@@ -69,7 +69,7 @@ use
crate::file::page_index::offset_index::OffsetIndexMetaData;
/// assert!(metadata.column_index().is_some());
/// assert!(metadata.offset_index().is_some());
/// ```
-#[derive(Default)]
+#[derive(Default, Debug)]
pub struct ParquetMetaDataReader {
metadata: Option<ParquetMetaData>,
column_index: PageIndexPolicy,
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index 1142a1c4a0..b1100c4bc4 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -163,6 +163,8 @@ pub mod format;
#[macro_use]
pub mod data_type;
+use std::fmt::Debug;
+use std::ops::Range;
// Exported for external use, such as benchmarks
#[cfg(feature = "experimental")]
#[doc(hidden)]
@@ -188,5 +190,20 @@ pub mod schema;
pub mod thrift;
+/// What data is needed to read the next item from a decoder.
+///
+/// This is used to communicate between the decoder and the caller
+/// to indicate what data is needed next, or what the result of decoding is.
+#[derive(Debug)]
+pub enum DecodeResult<T: Debug> {
+ /// The ranges of data necessary to proceed
+ // TODO: distinguish between minimim needed to make progress and what
could be used?
+ NeedsData(Vec<Range<u64>>),
+ /// The decoder produced an output item
+ Data(T),
+ /// The decoder finished processing
+ Finished,
+}
+
#[cfg(feature = "variant_experimental")]
pub mod variant;
diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs
index 1431132473..145cdd693e 100644
--- a/parquet/src/util/mod.rs
+++ b/parquet/src/util/mod.rs
@@ -20,6 +20,7 @@ pub mod bit_util;
mod bit_pack;
pub(crate) mod interner;
+pub mod push_buffers;
#[cfg(any(test, feature = "test_common"))]
pub(crate) mod test_common;
pub mod utf8;
diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs
new file mode 100644
index 0000000000..b30f91a81b
--- /dev/null
+++ b/parquet/src/util/push_buffers.rs
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::ParquetError;
+use crate::file::reader::{ChunkReader, Length};
+use bytes::Bytes;
+use std::fmt::Display;
+use std::ops::Range;
+
+/// Holds multiple buffers of data
+///
+/// This is the in-memory buffer for the ParquetDecoder and
ParquetMetadataDecoders
+///
+/// Features:
+/// 1. Zero copy
+/// 2. non contiguous ranges of bytes
+///
+/// # Non Coalescing
+///
+/// This buffer does not coalesce (merging adjacent ranges of bytes into a
+/// single range). Coalescing at this level would require copying the data but
+/// the caller may already have the needed data in a single buffer which would
+/// require no copying.
+///
+/// Thus, the implementation defers to the caller to coalesce subsequent
requests
+/// if desired.
+#[derive(Debug, Clone)]
+pub(crate) struct PushBuffers {
+ /// the virtual "offset" of this buffers (added to any request)
+ offset: u64,
+ /// The total length of the file being decoded
+ file_len: u64,
+ /// The ranges of data that are available for decoding (not adjusted for
offset)
+ ranges: Vec<Range<u64>>,
+ /// The buffers of data that can be used to decode the Parquet file
+ buffers: Vec<Bytes>,
+}
+
+impl Display for PushBuffers {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ writeln!(
+ f,
+ "Buffers (offset: {}, file_len: {})",
+ self.offset, self.file_len
+ )?;
+ writeln!(f, "Available Ranges (w/ offset):")?;
+ for range in &self.ranges {
+ writeln!(
+ f,
+ " {}..{} ({}..{}): {} bytes",
+ range.start,
+ range.end,
+ range.start + self.offset,
+ range.end + self.offset,
+ range.end - range.start
+ )?;
+ }
+
+ Ok(())
+ }
+}
+
+impl PushBuffers {
+ /// Create a new Buffers instance with the given file length
+ pub fn new(file_len: u64) -> Self {
+ Self {
+ offset: 0,
+ file_len,
+ ranges: Vec::new(),
+ buffers: Vec::new(),
+ }
+ }
+
+ /// Push all the ranges and buffers
+ pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers:
Vec<Bytes>) {
+ assert_eq!(
+ ranges.len(),
+ buffers.len(),
+ "Number of ranges must match number of buffers"
+ );
+ for (range, buffer) in ranges.into_iter().zip(buffers.into_iter()) {
+ self.push_range(range, buffer);
+ }
+ }
+
+ /// Push a new range and its associated buffer
+ pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) {
+ assert_eq!(
+ (range.end - range.start) as usize,
+ buffer.len(),
+ "Range length must match buffer length"
+ );
+ self.ranges.push(range);
+ self.buffers.push(buffer);
+ }
+
+ /// Returns true if the Buffers contains data for the given range
+ pub fn has_range(&self, range: &Range<u64>) -> bool {
+ self.ranges
+ .iter()
+ .any(|r| r.start <= range.start && r.end >= range.end)
+ }
+
+ fn iter(&self) -> impl Iterator<Item = (&Range<u64>, &Bytes)> {
+ self.ranges.iter().zip(self.buffers.iter())
+ }
+
+ /// return the file length of the Parquet file being read
+ pub fn file_len(&self) -> u64 {
+ self.file_len
+ }
+
+ /// Specify a new offset
+ pub fn with_offset(mut self, offset: u64) -> Self {
+ self.offset = offset;
+ self
+ }
+}
+
+impl Length for PushBuffers {
+ fn len(&self) -> u64 {
+ self.file_len
+ }
+}
+
+/// less efficient implementation of Read for Buffers
+impl std::io::Read for PushBuffers {
+ fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+ // Find the range that contains the start offset
+ let mut found = false;
+ for (range, data) in self.iter() {
+ if range.start <= self.offset && range.end >= self.offset +
buf.len() as u64 {
+ // Found the range, figure out the starting offset in the
buffer
+ let start_offset = (self.offset - range.start) as usize;
+ let end_offset = start_offset + buf.len();
+ let slice = data.slice(start_offset..end_offset);
+ buf.copy_from_slice(slice.as_ref());
+ found = true;
+ break;
+ }
+ }
+ if found {
+ // If we found the range, we can return the number of bytes read
+ // advance our offset
+ self.offset += buf.len() as u64;
+ Ok(buf.len())
+ } else {
+ Err(std::io::Error::new(
+ std::io::ErrorKind::UnexpectedEof,
+ "No data available in Buffers",
+ ))
+ }
+ }
+}
+
+impl ChunkReader for PushBuffers {
+ type T = Self;
+
+ fn get_read(&self, start: u64) -> Result<Self::T, ParquetError> {
+ Ok(self.clone().with_offset(self.offset + start))
+ }
+
+ fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes,
ParquetError> {
+ if start > self.file_len {
+ return Err(ParquetError::General(format!(
+ "Requested start {start} is beyond the end of the file (file
length: {})",
+ self.file_len
+ )));
+ }
+
+ // find the range that contains the start offset
+ for (range, data) in self.iter() {
+ if range.start <= start && range.end >= start + length as u64 {
+ // Found the range, figure out the starting offset in the
buffer
+ let start_offset = (start - range.start) as usize;
+ return Ok(data.slice(start_offset..start_offset + length));
+ }
+ }
+ // Signal that we need more data
+ let requested_end = start + length as u64;
+ Err(ParquetError::NeedMoreDataRange(start..requested_end))
+ }
+}