This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new aac969de60 ParquetPushDecoder API to clear all buffered ranges (#9624)
aac969de60 is described below
commit aac969de60c8a9c87d5face8592f657492cdff67
Author: Nathan <[email protected]>
AuthorDate: Tue Apr 7 17:11:53 2026 -0400
ParquetPushDecoder API to clear all buffered ranges (#9624)
## Which issue does this PR close?
- Closes #8676
## Rationale for this change
`ParquetPushDecoder` clears exact requested ranges, but larger
speculative pushed ranges can remain buffered in `PushBuffers`. This
adds a way for callers to explicitly release non exact ranges
## What changes are included in this PR?
This adds `release_all_ranges()`, which clears all byte ranges still
staged in the decoder's internal `PushBuffers`
## Are these changes tested?
Kinda tested. Tests added to verify the buffer is empty and verified
clearing does not interrupt the rowgroup reader
## Are there any user-facing changes?
Yes,this adds a new public `release_all_ranges()` API on
`ParquetPushDecoder`
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/arrow/push_decoder/mod.rs | 72 ++++++++++++++++++++++
.../src/arrow/push_decoder/reader_builder/mod.rs | 5 ++
parquet/src/arrow/push_decoder/remaining.rs | 5 ++
parquet/src/util/push_buffers.rs | 7 +++
4 files changed, 89 insertions(+)
diff --git a/parquet/src/arrow/push_decoder/mod.rs
b/parquet/src/arrow/push_decoder/mod.rs
index cdb0715edb..24384471a4 100644
--- a/parquet/src/arrow/push_decoder/mod.rs
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -365,6 +365,15 @@ impl ParquetPushDecoder {
pub fn buffered_bytes(&self) -> u64 {
self.state.buffered_bytes()
}
+
+ /// Clear any staged byte ranges currently buffered for future decode work.
+ ///
+ /// This clears byte ranges still owned by the decoder's internal
+ /// `PushBuffers`. It does not affect any data that has already been handed
+ /// off to an active [`ParquetRecordBatchReader`].
+ pub fn clear_all_ranges(&mut self) {
+ self.state.clear_all_ranges();
+ }
}
/// Internal state machine for the [`ParquetPushDecoder`]
@@ -573,6 +582,20 @@ impl ParquetDecoderState {
ParquetDecoderState::Finished => 0,
}
}
+
+ /// Clear any staged ranges currently buffered in the decoder.
+ fn clear_all_ranges(&mut self) {
+ match self {
+ ParquetDecoderState::ReadingRowGroup {
+ remaining_row_groups,
+ } => remaining_row_groups.clear_all_ranges(),
+ ParquetDecoderState::DecodingRowGroup {
+ record_batch_reader: _,
+ remaining_row_groups,
+ } => remaining_row_groups.clear_all_ranges(),
+ ParquetDecoderState::Finished => {}
+ }
+ }
}
#[cfg(test)]
@@ -665,6 +688,55 @@ mod test {
assert_eq!(all_output, *TEST_BATCH);
}
+ /// Releasing staged ranges should free speculative buffers without
affecting
+ /// the active row group reader.
+ #[test]
+ fn test_decoder_clear_all_ranges() {
+ let mut decoder =
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+ .unwrap()
+ .with_batch_size(100)
+ .build()
+ .unwrap();
+
+ decoder
+ .push_range(test_file_range(), TEST_FILE_DATA.clone())
+ .unwrap();
+ assert_eq!(decoder.buffered_bytes(), test_file_len());
+
+ // The current row group reader is built from the prefetched bytes, but
+ // the speculative full-file range remains staged in the decoder.
+ let batch1 = expect_data(decoder.try_decode());
+ assert_eq!(batch1, TEST_BATCH.slice(0, 100));
+ assert_eq!(decoder.buffered_bytes(), test_file_len());
+
+ // All of the buffer is released
+ decoder.clear_all_ranges();
+ assert_eq!(decoder.buffered_bytes(), 0);
+
+ // The active reader still owns the current row group's bytes, so it
can
+ // continue decoding without consulting PushBuffers.
+ let batch2 = expect_data(decoder.try_decode());
+ assert_eq!(batch2, TEST_BATCH.slice(100, 100));
+ assert_eq!(decoder.buffered_bytes(), 0);
+
+ // Moving to the next row group now requires the decoder to ask for
data
+ // again because the staged speculative ranges were released.
+ let ranges = expect_needs_data(decoder.try_decode());
+ let num_bytes_requested: u64 = ranges.iter().map(|r| r.end -
r.start).sum();
+ push_ranges_to_decoder(&mut decoder, ranges);
+ assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
+
+ let batch3 = expect_data(decoder.try_decode());
+ assert_eq!(batch3, TEST_BATCH.slice(200, 100));
+ assert_eq!(decoder.buffered_bytes(), 0);
+
+ let batch4 = expect_data(decoder.try_decode());
+ assert_eq!(batch4, TEST_BATCH.slice(300, 100));
+ assert_eq!(decoder.buffered_bytes(), 0);
+
+ expect_finished(decoder.try_decode());
+ }
+
/// Decode the entire file incrementally, simulating partial reads
#[test]
fn test_decoder_partial() {
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index d3d78ca7c2..922d8070c0 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -212,6 +212,11 @@ impl RowGroupReaderBuilder {
self.buffers.buffered_bytes()
}
+ /// Clear any staged ranges currently buffered for future decode work.
+ pub fn clear_all_ranges(&mut self) {
+ self.buffers.clear_all_ranges();
+ }
+
/// take the current state, leaving None in its place.
///
/// Returns an error if there the state wasn't put back after the previous
diff --git a/parquet/src/arrow/push_decoder/remaining.rs
b/parquet/src/arrow/push_decoder/remaining.rs
index 4613fda087..2986ca0da8 100644
--- a/parquet/src/arrow/push_decoder/remaining.rs
+++ b/parquet/src/arrow/push_decoder/remaining.rs
@@ -70,6 +70,11 @@ impl RemainingRowGroups {
self.row_group_reader_builder.buffered_bytes()
}
+ /// Clear any staged ranges currently buffered for future decode work
+ pub fn clear_all_ranges(&mut self) {
+ self.row_group_reader_builder.clear_all_ranges();
+ }
+
/// returns [`ParquetRecordBatchReader`] suitable for reading the next
/// group of rows from the Parquet data, or the list of data ranges still
/// needed to proceed
diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs
index 0c00cf9bd5..eb4982fb3c 100644
--- a/parquet/src/util/push_buffers.rs
+++ b/parquet/src/util/push_buffers.rs
@@ -154,6 +154,13 @@ impl PushBuffers {
self.ranges = new_ranges;
self.buffers = new_buffers;
}
+
+ /// Clear all buffered ranges and their corresponding data
+ #[cfg(feature = "arrow")]
+ pub fn clear_all_ranges(&mut self) {
+ self.ranges.clear();
+ self.buffers.clear();
+ }
}
impl Length for PushBuffers {