This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new aac969de60 ParquetPushDecoder API to clear all buffered ranges (#9624)
aac969de60 is described below

commit aac969de60c8a9c87d5face8592f657492cdff67
Author: Nathan <[email protected]>
AuthorDate: Tue Apr 7 17:11:53 2026 -0400

    ParquetPushDecoder API to clear all buffered ranges (#9624)
    
    ## Which issue does this PR close?
    - Closes #8676
    
    ## Rationale for this change
    `ParquetPushDecoder` clears exact requested ranges, but larger
    speculative pushed ranges can remain buffered in `PushBuffers`. This
    adds a way for callers to explicitly release non exact ranges
    
    ## What changes are included in this PR?
    This adds `release_all_ranges()`, which clears all byte ranges still
    staged in the decoder's internal `PushBuffers`
    
    ## Are these changes tested?
    Kinda tested. Tests added to verify the buffer is empty and verified
    clearing does not interrupt the rowgroup reader
    
    ## Are there any user-facing changes?
    Yes,this adds a new public `release_all_ranges()` API on
    `ParquetPushDecoder`
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/arrow/push_decoder/mod.rs              | 72 ++++++++++++++++++++++
 .../src/arrow/push_decoder/reader_builder/mod.rs   |  5 ++
 parquet/src/arrow/push_decoder/remaining.rs        |  5 ++
 parquet/src/util/push_buffers.rs                   |  7 +++
 4 files changed, 89 insertions(+)

diff --git a/parquet/src/arrow/push_decoder/mod.rs 
b/parquet/src/arrow/push_decoder/mod.rs
index cdb0715edb..24384471a4 100644
--- a/parquet/src/arrow/push_decoder/mod.rs
+++ b/parquet/src/arrow/push_decoder/mod.rs
@@ -365,6 +365,15 @@ impl ParquetPushDecoder {
     pub fn buffered_bytes(&self) -> u64 {
         self.state.buffered_bytes()
     }
+
+    /// Clear any staged byte ranges currently buffered for future decode work.
+    ///
+    /// This clears byte ranges still owned by the decoder's internal
+    /// `PushBuffers`. It does not affect any data that has already been handed
+    /// off to an active [`ParquetRecordBatchReader`].
+    pub fn clear_all_ranges(&mut self) {
+        self.state.clear_all_ranges();
+    }
 }
 
 /// Internal state machine for the [`ParquetPushDecoder`]
@@ -573,6 +582,20 @@ impl ParquetDecoderState {
             ParquetDecoderState::Finished => 0,
         }
     }
+
+    /// Clear any staged ranges currently buffered in the decoder.
+    fn clear_all_ranges(&mut self) {
+        match self {
+            ParquetDecoderState::ReadingRowGroup {
+                remaining_row_groups,
+            } => remaining_row_groups.clear_all_ranges(),
+            ParquetDecoderState::DecodingRowGroup {
+                record_batch_reader: _,
+                remaining_row_groups,
+            } => remaining_row_groups.clear_all_ranges(),
+            ParquetDecoderState::Finished => {}
+        }
+    }
 }
 
 #[cfg(test)]
@@ -665,6 +688,55 @@ mod test {
         assert_eq!(all_output, *TEST_BATCH);
     }
 
+    /// Releasing staged ranges should free speculative buffers without 
affecting
+    /// the active row group reader.
+    #[test]
+    fn test_decoder_clear_all_ranges() {
+        let mut decoder = 
ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
+            .unwrap()
+            .with_batch_size(100)
+            .build()
+            .unwrap();
+
+        decoder
+            .push_range(test_file_range(), TEST_FILE_DATA.clone())
+            .unwrap();
+        assert_eq!(decoder.buffered_bytes(), test_file_len());
+
+        // The current row group reader is built from the prefetched bytes, but
+        // the speculative full-file range remains staged in the decoder.
+        let batch1 = expect_data(decoder.try_decode());
+        assert_eq!(batch1, TEST_BATCH.slice(0, 100));
+        assert_eq!(decoder.buffered_bytes(), test_file_len());
+
+        // All of the buffer is released
+        decoder.clear_all_ranges();
+        assert_eq!(decoder.buffered_bytes(), 0);
+
+        // The active reader still owns the current row group's bytes, so it 
can
+        // continue decoding without consulting PushBuffers.
+        let batch2 = expect_data(decoder.try_decode());
+        assert_eq!(batch2, TEST_BATCH.slice(100, 100));
+        assert_eq!(decoder.buffered_bytes(), 0);
+
+        // Moving to the next row group now requires the decoder to ask for 
data
+        // again because the staged speculative ranges were released.
+        let ranges = expect_needs_data(decoder.try_decode());
+        let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - 
r.start).sum();
+        push_ranges_to_decoder(&mut decoder, ranges);
+        assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
+
+        let batch3 = expect_data(decoder.try_decode());
+        assert_eq!(batch3, TEST_BATCH.slice(200, 100));
+        assert_eq!(decoder.buffered_bytes(), 0);
+
+        let batch4 = expect_data(decoder.try_decode());
+        assert_eq!(batch4, TEST_BATCH.slice(300, 100));
+        assert_eq!(decoder.buffered_bytes(), 0);
+
+        expect_finished(decoder.try_decode());
+    }
+
     /// Decode the entire file incrementally, simulating partial reads
     #[test]
     fn test_decoder_partial() {
diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs 
b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
index d3d78ca7c2..922d8070c0 100644
--- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs
+++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs
@@ -212,6 +212,11 @@ impl RowGroupReaderBuilder {
         self.buffers.buffered_bytes()
     }
 
+    /// Clear any staged ranges currently buffered for future decode work.
+    pub fn clear_all_ranges(&mut self) {
+        self.buffers.clear_all_ranges();
+    }
+
     /// take the current state, leaving None in its place.
     ///
     /// Returns an error if there the state wasn't put back after the previous
diff --git a/parquet/src/arrow/push_decoder/remaining.rs 
b/parquet/src/arrow/push_decoder/remaining.rs
index 4613fda087..2986ca0da8 100644
--- a/parquet/src/arrow/push_decoder/remaining.rs
+++ b/parquet/src/arrow/push_decoder/remaining.rs
@@ -70,6 +70,11 @@ impl RemainingRowGroups {
         self.row_group_reader_builder.buffered_bytes()
     }
 
+    /// Clear any staged ranges currently buffered for future decode work
+    pub fn clear_all_ranges(&mut self) {
+        self.row_group_reader_builder.clear_all_ranges();
+    }
+
     /// returns [`ParquetRecordBatchReader`] suitable for reading the next
     /// group of rows from the Parquet data, or the list of data ranges still
     /// needed to proceed
diff --git a/parquet/src/util/push_buffers.rs b/parquet/src/util/push_buffers.rs
index 0c00cf9bd5..eb4982fb3c 100644
--- a/parquet/src/util/push_buffers.rs
+++ b/parquet/src/util/push_buffers.rs
@@ -154,6 +154,13 @@ impl PushBuffers {
         self.ranges = new_ranges;
         self.buffers = new_buffers;
     }
+
+    /// Clear all buffered ranges and their corresponding data
+    #[cfg(feature = "arrow")]
+    pub fn clear_all_ranges(&mut self) {
+        self.ranges.clear();
+        self.buffers.clear();
+    }
 }
 
 impl Length for PushBuffers {

Reply via email to