zhuqi-lucas commented on code in PR #18817:
URL: https://github.com/apache/datafusion/pull/18817#discussion_r2581146514
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -752,10 +799,316 @@ fn should_enable_page_index(
.unwrap_or(false)
}
+fn reverse_batch(batch: RecordBatch) -> Result<RecordBatch> {
+ let num_rows = batch.num_rows();
+ if num_rows <= 1 {
+ return Ok(batch);
+ }
+
+ let indices = UInt32Array::from_iter_values((0..num_rows as u32).rev());
+
+ let reversed_columns = batch
+ .columns()
+ .iter()
+ .map(|col| take(col.as_ref(), &indices, None))
+ .collect::<std::result::Result<Vec<ArrayRef>,
arrow::error::ArrowError>>()
+ .map_err(DataFusionError::from)?;
+
+ RecordBatch::try_new(batch.schema(),
reversed_columns).map_err(DataFusionError::from)
+}
+
+/// Stream adapter for reversed parquet reading with row-group-level buffering.
+///
+/// # Architecture
+///
+/// This stream implements a sophisticated buffering strategy to achieve true
reverse
+/// reading of Parquet files while maintaining compatibility with the
underlying
+/// ParquetRecordBatchStream's optimizations (caching, prefetching, etc.).
+///
+/// ## Strategy Overview
+///
+/// 1. **Pre-reversed Row Groups**: Row groups are reversed BEFORE building
the stream
+/// (via `row_group_indexes.reverse()`). This allows the Parquet reader to
read
+/// them in reverse order while still utilizing internal optimizations.
+///
+/// 2. **Row-Group-Level Buffering**: As batches arrive from the input stream,
we
+/// track which row group they belong to using cumulative row counts. This
is
+/// the MINIMAL buffering unit required for correctness - we cannot reverse
+/// individual batches without knowing the complete row group context.
+///
+/// 3. **Two-Stage Reversal**: When a complete row group is collected:
+/// - Stage 1: Reverse rows within each batch (using Arrow's take kernel)
+/// - Stage 2: Reverse the order of batches within the row group
+///
+/// 4. **Progressive Output**: Reversed batches are output immediately,
minimizing
+/// memory footprint. We never buffer more than one row group at a time.
+///
+/// ## Memory Characteristics
+///
+/// - **Bounded Memory**: Maximum memory usage = size of largest row group
+/// - **Typical Usage**: ~128MB (default Parquet row group size)
+/// - **Peak Usage**: During reversal of a single row group
+///
+/// ## Why Row-Group-Level Buffering is Necessary
+///
+/// Parquet organizes data into row groups (typically 128MB each), and each
row group
+/// is independently compressed and encoded. When reading in reverse:
+///
+/// - We cannot reverse individual batches in isolation because they may span
+/// row group boundaries or be split arbitrarily by the batch_size parameter
+/// - We must buffer complete row groups to ensure correct ordering semantics
+/// - This is the minimum granularity that maintains correctness
+///
+/// ## Example
+///
+/// Given a file with 3 row groups, each containing 2 batches:
+///
+/// ```text
+/// Normal order: RG0[B0, B1] -> RG1[B0, B1] -> RG2[B0, B1]
+/// Reversed: RG2[B1_rev, B0_rev] -> RG1[B1_rev, B0_rev] -> RG0[B1_rev,
B0_rev]
+/// ^^^^^^^^^^^^ ^^^^^^^^^^^^ ^^^^^^^^^^^^
+/// Output 1st Output 2nd Output 3rd
+/// ```
+///
+/// ## Performance Characteristics
+///
+/// - **Latency**: First batch available after reading complete first
(reversed) row group
+/// - **Throughput**: Near-native speed with ~5-10% overhead for reversal
operations
Review Comment:
This is from my estimation, i can add create a benchmark for this operation
as follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]