alamb commented on code in PR #9697:
URL: https://github.com/apache/arrow-rs/pull/9697#discussion_r3081487227


##########
parquet/src/util/push_buffers.rs:
##########
@@ -48,6 +59,13 @@ pub(crate) struct PushBuffers {
     ranges: Vec<Range<u64>>,
     /// The buffers of data that can be used to decode the Parquet file
     buffers: Vec<Bytes>,
+    /// High-water mark set by [`Self::release_through`].  After a release,
+    /// no push, has_range, or read may target offsets below this value.

Review Comment:
   it is probably good to point outwhat "may not" means (like does the code 
panic if it is tried?)



##########
parquet/src/file/metadata/mod.rs:
##########
@@ -713,6 +713,21 @@ impl RowGroupMetaData {
         self.file_offset
     }
 
+    /// Returns the byte offset just past the last column chunk in this row 
group.

Review Comment:
   I think practically speaking most parquet files will have the column chunks 
for one row group written contiguously in the file, but I am not sure the spec 
requires this. I do think it effectively requires all pages for a column to be 
in a contiguous range



##########
parquet/src/util/push_buffers.rs:
##########
@@ -82,83 +100,166 @@ impl PushBuffers {
             file_len,
             ranges: Vec::new(),
             buffers: Vec::new(),
+            #[cfg(feature = "arrow")]
+            watermark: 0,
+            sorted: true,
         }
     }
 
-    /// Push all the ranges and buffers
+    /// Restore the sort invariant on `ranges`/`buffers`.
+    ///
+    /// Because IO completions are expected to generally arrive in-order,
+    /// `push_range` appends without sorting. We instead delay sorting until
+    /// conumption to amortize its cost, if necessary.
+    ///
+    /// This method must be called before any read-side operation that relies 
on
+    /// binary search (`has_range`, `get_bytes`, `release_through`,
+    /// `Read::read`). Callers that hold `&mut PushBuffers` should call this
+    /// once before lending `&PushBuffers` to read-side code.
+    pub fn ensure_sorted(&mut self) {
+        if self.sorted {
+            return;
+        }
+
+        // Insertion sort: zero-allocation and linear on nearly-sorted input

Review Comment:
   this is n^2 on reverse sorted input though, right?



##########
parquet/src/util/push_buffers.rs:
##########
@@ -48,6 +59,13 @@ pub(crate) struct PushBuffers {
     ranges: Vec<Range<u64>>,
     /// The buffers of data that can be used to decode the Parquet file
     buffers: Vec<Bytes>,
+    /// High-water mark set by [`Self::release_through`].  After a release,
+    /// no push, has_range, or read may target offsets below this value.
+    #[cfg(feature = "arrow")]
+    watermark: u64,
+    /// Whether `ranges`/`buffers` are sorted by range start.
+    /// Set to `false` on every `push_range`, restored lazily before reads.
+    sorted: bool,

Review Comment:
   What if we encoded the sort invariant in the type system rather than relying 
on the flag to be set correctly? Something like 
   
   ```rust
   enum Buffers {
     Sorted {
       ranges: Vec<Range<u64>>,
       buffers: Vec<Bytes>,
     }
     UnSorted {
       ranges: Vec<Range<u64>>,
       buffers: Vec<Bytes>,
     }
   }
   ```
   
   Maybe it is overly complicated but it make it much clearer that all paths 
correctly update the sorting



##########
parquet/src/util/push_buffers.rs:
##########
@@ -48,6 +59,13 @@ pub(crate) struct PushBuffers {
     ranges: Vec<Range<u64>>,

Review Comment:
   If the goal is to keep a list sorted by start range, did you consider using 
a `BTreeSet`? You could then define some sort of wrapper over Range/Bytes liie
   
   ```rust
   struct RangeAndData {
     range: Range<u64>,
     buffer: Bytes
   }
   impl PartialOrd for RangeAndData {
     // define comparison from start range
   }
   
   pub(crate) struct PushBuffers {
   ...
       buffers: BtreeSet<RangeAndData>,
   }
   ```
   
   That would probably simplify the accounting significantly



##########
parquet/src/util/push_buffers.rs:
##########
@@ -82,83 +100,166 @@ impl PushBuffers {
             file_len,
             ranges: Vec::new(),
             buffers: Vec::new(),
+            #[cfg(feature = "arrow")]
+            watermark: 0,
+            sorted: true,
         }
     }
 
-    /// Push all the ranges and buffers
+    /// Restore the sort invariant on `ranges`/`buffers`.
+    ///
+    /// Because IO completions are expected to generally arrive in-order,
+    /// `push_range` appends without sorting. We instead delay sorting until
+    /// conumption to amortize its cost, if necessary.
+    ///
+    /// This method must be called before any read-side operation that relies 
on
+    /// binary search (`has_range`, `get_bytes`, `release_through`,
+    /// `Read::read`). Callers that hold `&mut PushBuffers` should call this
+    /// once before lending `&PushBuffers` to read-side code.
+    pub fn ensure_sorted(&mut self) {

Review Comment:
   does it need to be pub?



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -212,9 +212,15 @@ impl RowGroupReaderBuilder {
         self.buffers.buffered_bytes()
     }
 
-    /// Clear any staged ranges currently buffered for future decode work.
-    pub fn clear_all_ranges(&mut self) {
-        self.buffers.clear_all_ranges();
+    /// Release all staged ranges currently buffered for future decode work.

Review Comment:
   I think the clear_all_ranges API was introduced in 
https://github.com/apache/arrow-rs/pull/9673 by @nathanb9  (and since we 
haven't released that yet) this change isn't a breaking API
   
   



##########
parquet/src/util/push_buffers.rs:
##########
@@ -26,18 +26,29 @@ use std::ops::Range;
 /// This is the in-memory buffer for the ParquetDecoder and 
ParquetMetadataDecoders
 ///
 /// Features:
-/// 1. Zero copy
-/// 2. non contiguous ranges of bytes
+/// 1. Non-contiguous ranges of bytes
+/// 2. Stitching: reads that span multiple contiguous physical buffers are
+///    resolved transparently. When a single buffer covers the request, the
+///    result is zero-copy ([`Bytes::slice`]). When multiple buffers must be
+///    stitched, the data is copied into a new allocation.
 ///
-/// # Non Coalescing
+/// # No Coalescing
 ///
-/// This buffer does not coalesce  (merging adjacent ranges of bytes into a
-/// single range). Coalescing at this level would require copying the data but
-/// the caller may already have the needed data in a single buffer which would
-/// require no copying.
+/// This buffer does not coalesce (merging adjacent ranges of bytes into a 
ingle
+/// range). The IO layer is free to push arbitrarily-sized buffers; they will 
be
+/// stitched on read if needed. Coalescing is left to the IO layer because it
+/// would require an extra copy here, and because the optimal coalescing
+/// strategy depends on the workload and storage medium (e.g. spinning disk,
+/// NVMe, blob storage,) context that only the IO layer has.
 ///
-/// Thus, the implementation defers to the caller to coalesce subsequent 
requests
-/// if desired.
+/// # No Speculative Prefetching

Review Comment:
   👍 



##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -610,6 +617,12 @@ impl RowGroupReaderBuilder {
                     &mut self.buffers,
                 )?;
 
+                // All data for this row group has been extracted into the
+                // InMemoryRowGroup.  Release physical buffers up to the end
+                // of this row group so streaming IO can reclaim memory.
+                self.buffers

Review Comment:
   You can configure the parquet reader to read row groups in some arbitrary 
order 
[with_row_groups](https://docs.rs/parquet/58.0.0/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#method.with_row_groups)
 
   
   Also technically there is no reason that row groups have to be written in 
order (though most writers will do that) -- for example, you could have a file 
where the bytes for row group 0 are after the bytes for row group 1. 
   
   So I think assuming that the reader will never want any bytes prior to the 
current row group should be reconsidered.
   
   Can we instead perhaps release data for the start/end of the row group? 
rather than just a one sided range?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to