alamb commented on code in PR #9697:
URL: https://github.com/apache/arrow-rs/pull/9697#discussion_r3081487227
##########
parquet/src/util/push_buffers.rs:
##########
@@ -48,6 +59,13 @@ pub(crate) struct PushBuffers {
ranges: Vec<Range<u64>>,
/// The buffers of data that can be used to decode the Parquet file
buffers: Vec<Bytes>,
+ /// High-water mark set by [`Self::release_through`]. After a release,
+ /// no push, has_range, or read may target offsets below this value.
Review Comment:
it is probably good to point outwhat "may not" means (like does the code
panic if it is tried?)
##########
parquet/src/file/metadata/mod.rs:
##########
@@ -713,6 +713,21 @@ impl RowGroupMetaData {
self.file_offset
}
+ /// Returns the byte offset just past the last column chunk in this row
group.
Review Comment:
I think practically speaking most parquet files will have the column chunks
for one row group written contiguously in the file, but I am not sure the spec
requires this. I do think it effectively requires all pages for a column to be
in a contiguous range
##########
parquet/src/util/push_buffers.rs:
##########
@@ -82,83 +100,166 @@ impl PushBuffers {
file_len,
ranges: Vec::new(),
buffers: Vec::new(),
+ #[cfg(feature = "arrow")]
+ watermark: 0,
+ sorted: true,
}
}
- /// Push all the ranges and buffers
+ /// Restore the sort invariant on `ranges`/`buffers`.
+ ///
+ /// Because IO completions are expected to generally arrive in-order,
+ /// `push_range` appends without sorting. We instead delay sorting until
+ /// conumption to amortize its cost, if necessary.
+ ///
+ /// This method must be called before any read-side operation that relies
on
+ /// binary search (`has_range`, `get_bytes`, `release_through`,
+ /// `Read::read`). Callers that hold `&mut PushBuffers` should call this
+ /// once before lending `&PushBuffers` to read-side code.
+ pub fn ensure_sorted(&mut self) {
+ if self.sorted {
+ return;
+ }
+
+ // Insertion sort: zero-allocation and linear on nearly-sorted input
Review Comment:
this is n^2 on reverse sorted input though, right?
##########
parquet/src/util/push_buffers.rs:
##########
@@ -48,6 +59,13 @@ pub(crate) struct PushBuffers {
ranges: Vec<Range<u64>>,
/// The buffers of data that can be used to decode the Parquet file
buffers: Vec<Bytes>,
+ /// High-water mark set by [`Self::release_through`]. After a release,
+ /// no push, has_range, or read may target offsets below this value.
+ #[cfg(feature = "arrow")]
+ watermark: u64,
+ /// Whether `ranges`/`buffers` are sorted by range start.
+ /// Set to `false` on every `push_range`, restored lazily before reads.
+ sorted: bool,
Review Comment:
What if we encoded the sort invariant in the type system rather than relying
on the flag to be set correctly? Something like
```rust
enum Buffers {
Sorted {
ranges: Vec<Range<u64>>,
buffers: Vec<Bytes>,
}
UnSorted {
ranges: Vec<Range<u64>>,
buffers: Vec<Bytes>,
}
}
```
Maybe it is overly complicated but it make it much clearer that all paths
correctly update the sorting
##########
parquet/src/util/push_buffers.rs:
##########
@@ -48,6 +59,13 @@ pub(crate) struct PushBuffers {
ranges: Vec<Range<u64>>,
Review Comment:
If the goal is to keep a list sorted by start range, did you consider using
a `BTreeSet`? You could then define some sort of wrapper over Range/Bytes liie
```rust
struct RangeAndData {
range: Range<u64>,
buffer: Bytes
}
impl PartialOrd for RangeAndData {
// define comparison from start range
}
pub(crate) struct PushBuffers {
...
buffers: BtreeSet<RangeAndData>,
}
```
That would probably simplify the accounting significantly
##########
parquet/src/util/push_buffers.rs:
##########
@@ -82,83 +100,166 @@ impl PushBuffers {
file_len,
ranges: Vec::new(),
buffers: Vec::new(),
+ #[cfg(feature = "arrow")]
+ watermark: 0,
+ sorted: true,
}
}
- /// Push all the ranges and buffers
+ /// Restore the sort invariant on `ranges`/`buffers`.
+ ///
+ /// Because IO completions are expected to generally arrive in-order,
+ /// `push_range` appends without sorting. We instead delay sorting until
+ /// conumption to amortize its cost, if necessary.
+ ///
+ /// This method must be called before any read-side operation that relies
on
+ /// binary search (`has_range`, `get_bytes`, `release_through`,
+ /// `Read::read`). Callers that hold `&mut PushBuffers` should call this
+ /// once before lending `&PushBuffers` to read-side code.
+ pub fn ensure_sorted(&mut self) {
Review Comment:
does it need to be pub?
##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -212,9 +212,15 @@ impl RowGroupReaderBuilder {
self.buffers.buffered_bytes()
}
- /// Clear any staged ranges currently buffered for future decode work.
- pub fn clear_all_ranges(&mut self) {
- self.buffers.clear_all_ranges();
+ /// Release all staged ranges currently buffered for future decode work.
Review Comment:
I think the clear_all_ranges API was introduced in
https://github.com/apache/arrow-rs/pull/9673 by @nathanb9 (and since we
haven't released that yet) this change isn't a breaking API
##########
parquet/src/util/push_buffers.rs:
##########
@@ -26,18 +26,29 @@ use std::ops::Range;
/// This is the in-memory buffer for the ParquetDecoder and
ParquetMetadataDecoders
///
/// Features:
-/// 1. Zero copy
-/// 2. non contiguous ranges of bytes
+/// 1. Non-contiguous ranges of bytes
+/// 2. Stitching: reads that span multiple contiguous physical buffers are
+/// resolved transparently. When a single buffer covers the request, the
+/// result is zero-copy ([`Bytes::slice`]). When multiple buffers must be
+/// stitched, the data is copied into a new allocation.
///
-/// # Non Coalescing
+/// # No Coalescing
///
-/// This buffer does not coalesce (merging adjacent ranges of bytes into a
-/// single range). Coalescing at this level would require copying the data but
-/// the caller may already have the needed data in a single buffer which would
-/// require no copying.
+/// This buffer does not coalesce (merging adjacent ranges of bytes into a
ingle
+/// range). The IO layer is free to push arbitrarily-sized buffers; they will
be
+/// stitched on read if needed. Coalescing is left to the IO layer because it
+/// would require an extra copy here, and because the optimal coalescing
+/// strategy depends on the workload and storage medium (e.g. spinning disk,
+/// NVMe, blob storage,) context that only the IO layer has.
///
-/// Thus, the implementation defers to the caller to coalesce subsequent
requests
-/// if desired.
+/// # No Speculative Prefetching
Review Comment:
👍
##########
parquet/src/arrow/push_decoder/reader_builder/mod.rs:
##########
@@ -610,6 +617,12 @@ impl RowGroupReaderBuilder {
&mut self.buffers,
)?;
+ // All data for this row group has been extracted into the
+ // InMemoryRowGroup. Release physical buffers up to the end
+ // of this row group so streaming IO can reclaim memory.
+ self.buffers
Review Comment:
You can configure the parquet reader to read row groups in some arbitrary
order
[with_row_groups](https://docs.rs/parquet/58.0.0/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#method.with_row_groups)
Also technically there is no reason that row groups have to be written in
order (though most writers will do that) -- for example, you could have a file
where the bytes for row group 0 are after the bytes for row group 1.
So I think assuming that the reader will never want any bytes prior to the
current row group should be reconsidered.
Can we instead perhaps release data for the start/end of the row group?
rather than just a one sided range?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]