zhuqi-lucas opened a new issue, #9934:
URL: https://github.com/apache/arrow-rs/issues/9934
## Is your feature request related to a problem or challenge?
> **Note**: This is a refined / scoped follow-up to #3922. To avoid the
> historical objections in that thread, this issue **explicitly does NOT
> request reverse-decoding of rows within a page** (which is indeed
> impractical due to RLE / dictionary / delta / bit-packing being forward
> streams). Instead, it requests **page-level reverse iteration**:
> traversing pages in reverse order while still decoding each page
> forward. See "Scope" below.
DataFusion has merged row-group-level reverse scan support
(apache/datafusion#18817) to optimize queries like
`ORDER BY sorted_col DESC LIMIT N` on ascending-sorted Parquet files.
The current implementation works at **row-group granularity** (~128MB),
which has two limitations:
1. **High first-batch latency**: must decode an entire row group
(~1M rows) before emitting the first reversed batch.
2. **High peak memory**: must buffer one full row group to reverse it.
For the **no-filter** case, `RowSelection::skip(rg_size - N).select(N)`
is sufficient, as already discussed in #3922. **However, this does not
work when there is a filter predicate.** Consider:
```sql
SELECT * FROM events
WHERE user_id = 42
ORDER BY ts DESC
LIMIT 10
```
We do not know which rows match `user_id = 42`, so we cannot
pre-compute a `RowSelection` for "the last 10 matches". The only
correct approach is **streaming pages in reverse order, decoding +
filtering each page, until 10 matches are accumulated**.
This is the killer use case for page-level reverse iteration.
## Scope (what this issue is and is NOT)
**This issue requests** (achievable today via `OffsetIndex`):
- Iterate `OffsetIndex.page_locations` **in reverse order**
- For each page, **decode forward as usual** (using existing
`SerializedPageReader` machinery)
- Reverse the resulting `RecordBatch` (or decoded values) at the page
boundary using existing kernels (`take` etc.)
- Apply `RowFilter` on each reversed page; emit and accumulate against
`LIMIT`
**This issue does NOT request**:
- Reverse-decoding rows *within* a single page (impossible due to
forward-only encodings — agreed with the analysis in #3922)
- Reordering row groups (already handled at the DataFusion layer)
- Any change to the on-disk Parquet format
## Why this is feasible
Page-level reverse iteration only needs random access to pages, which
the **`OffsetIndex` already provides** for any Parquet file written with
page indexes (the default for most modern writers, including arrow-rs).
`OffsetIndexMetaData::page_locations()` exposes
`Vec<PageLocation { offset, compressed_page_size, first_row_index }>`,
which is sufficient to seek directly to any page in any order.
The dictionary page is at the start of the column chunk and must be
read once before any data page — this is a known one-time cost, not a
blocker.
## Describe the solution you'd like
Two possible API surfaces (open to discussion):
### Option A: High-level builder flag
```rust
let reader = ParquetRecordBatchReaderBuilder::try_new(file)?
.with_row_filter(filter)
.with_limit(10)
.with_reverse_pages(true) // <-- new
.build()?;
```
The builder reverse-traverses pages within each row group, applies
the filter, and emits `RecordBatch`es with rows already in reverse
order. `with_limit` causes early termination once N matches are
collected.
### Option B: Low-level `PageReader` variant
```rust
pub struct ReverseSerializedPageReader<R: ChunkReader> { ... }
impl<R: ChunkReader> ReverseSerializedPageReader<R> {
pub fn new(
reader: Arc<R>,
meta: &ColumnChunkMetaData,
offset_index: &OffsetIndexMetaData,
) -> Result<Self>;
}
impl<R: ChunkReader> PageReader for ReverseSerializedPageReader<R> { ... }
```
Option B is more flexible but pushes the orchestration burden onto
callers (DataFusion, etc.). My current preference is to start with B as
the foundation and layer A on top.
## Performance expectation
Compared to the current row-group-level reverse in DataFusion, on
`SELECT * FROM t WHERE rare_predicate ORDER BY ts DESC LIMIT 10`:
| Metric | Row-group reverse | Page reverse |
|---|---|---|
| First-batch latency | decode ~1M rows | decode ~1 page (~10K rows) |
| Peak buffer memory | ~128MB (1 row group) | ~1MB (1 page) |
| Wasted decode work (filter selectivity 1%) | ~1M rows for 10 matches |
scan from tail until 10 matches |
Concrete benchmarks will follow once a POC is in place.
## Describe alternatives I've considered
1. **`RowSelection::skip(N).select(M)`** — only works without filter.
Does not solve the filter+reverse case.
2. **Row-group-level reverse** — already done at DataFusion layer
(apache/datafusion#18817). Works but has the latency / memory
limitations described above.
3. **Read entire file, sort in memory** — current default. Defeats the
purpose of having sorted Parquet files.
## Additional context
- Related: #3922 (broader reverse-stream request)
- DataFusion umbrella: apache/datafusion#17172
- DataFusion follow-up: apache/datafusion#19486
- DataFusion row-group reverse PR (merged): apache/datafusion#18817
I'm happy to drive the implementation if there is consensus on
direction.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]