This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 27de50d055 fix: Reverse row selection should respect the row group
index (#19557)
27de50d055 is described below
commit 27de50d0553b7bbf1ec4172ec87bc23c7f2a5acb
Author: Qi Zhu <[email protected]>
AuthorDate: Wed Dec 31 14:35:33 2025 +0800
fix: Reverse row selection should respect the row group index (#19557)
## Which issue does this PR close?
- Closes [#19535](https://github.com/apache/datafusion/issues/19535)
## Rationale for this change
Reverse row selection should respect the row group index, this PR will
fix the issue.
## What changes are included in this PR?
Reverse row selection should respect the row group index, this PR will
fix the issue.
## Are these changes tested?
Yes
## Are there any user-facing changes?
No
---
datafusion/datasource-parquet/src/opener.rs | 119 +++-
datafusion/datasource-parquet/src/sort.rs | 865 ++++++++++++++++++++++++----
2 files changed, 851 insertions(+), 133 deletions(-)
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index bea970f144..83bdf79c8f 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -121,16 +121,16 @@ pub(super) struct ParquetOpener {
}
/// Represents a prepared access plan with optional row selection
-struct PreparedAccessPlan {
+pub(crate) struct PreparedAccessPlan {
/// Row group indexes to read
- row_group_indexes: Vec<usize>,
+ pub(crate) row_group_indexes: Vec<usize>,
/// Optional row selection for filtering within row groups
- row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
+ pub(crate) row_selection:
Option<parquet::arrow::arrow_reader::RowSelection>,
}
impl PreparedAccessPlan {
/// Create a new prepared access plan from a ParquetAccessPlan
- fn from_access_plan(
+ pub(crate) fn from_access_plan(
access_plan: ParquetAccessPlan,
rg_metadata: &[RowGroupMetaData],
) -> Result<Self> {
@@ -144,17 +144,23 @@ impl PreparedAccessPlan {
}
/// Reverse the access plan for reverse scanning
- fn reverse(
+ pub(crate) fn reverse(
mut self,
file_metadata: &parquet::file::metadata::ParquetMetaData,
) -> Result<Self> {
+ // Get the row group indexes before reversing
+ let row_groups_to_scan = self.row_group_indexes.clone();
+
// Reverse the row group indexes
self.row_group_indexes =
self.row_group_indexes.into_iter().rev().collect();
// If we have a row selection, reverse it to match the new row group
order
if let Some(row_selection) = self.row_selection {
- self.row_selection =
- Some(reverse_row_selection(&row_selection, file_metadata)?);
+ self.row_selection = Some(reverse_row_selection(
+ &row_selection,
+ file_metadata,
+ &row_groups_to_scan, // Pass the original (non-reversed) row
group indexes
+ )?);
}
Ok(self)
@@ -964,7 +970,7 @@ mod test {
use std::sync::Arc;
use super::{ConstantColumns, constant_columns_from_stats};
- use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener};
+ use crate::{DefaultParquetFileReaderFactory, RowGroupAccess,
opener::ParquetOpener};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use bytes::{BufMut, BytesMut};
use datafusion_common::{
@@ -1851,4 +1857,101 @@ mod test {
"Reverse scan should reverse row group order while maintaining
correct RowSelection for each group"
);
}
+
+ #[tokio::test]
+ async fn test_reverse_scan_with_non_contiguous_row_groups() {
+ use parquet::file::properties::WriterProperties;
+
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ // Create 4 batches (4 row groups)
+ let batch0 = record_batch!(("a", Int32, vec![Some(1),
Some(2)])).unwrap();
+ let batch1 = record_batch!(("a", Int32, vec![Some(3),
Some(4)])).unwrap();
+ let batch2 = record_batch!(("a", Int32, vec![Some(5),
Some(6)])).unwrap();
+ let batch3 = record_batch!(("a", Int32, vec![Some(7),
Some(8)])).unwrap();
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(2)
+ .build();
+
+ let data_len = write_parquet_batches(
+ Arc::clone(&store),
+ "test.parquet",
+ vec![batch0.clone(), batch1, batch2, batch3],
+ Some(props),
+ )
+ .await;
+
+ let schema = batch0.schema();
+
+ use crate::ParquetAccessPlan;
+ use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+
+ // KEY: Skip RG1 (non-contiguous!)
+ // Only scan row groups: [0, 2, 3]
+ let mut access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Scan, // RG0
+ RowGroupAccess::Skip, // RG1 - SKIPPED!
+ RowGroupAccess::Scan, // RG2
+ RowGroupAccess::Scan, // RG3
+ ]);
+
+ // Add RowSelection for each scanned row group
+ // RG0: select first row (1), skip second (2)
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::select(1),
RowSelector::skip(1)]),
+ );
+ // RG1: skipped, no selection needed
+ // RG2: select first row (5), skip second (6)
+ access_plan.scan_selection(
+ 2,
+ RowSelection::from(vec![RowSelector::select(1),
RowSelector::skip(1)]),
+ );
+ // RG3: select first row (7), skip second (8)
+ access_plan.scan_selection(
+ 3,
+ RowSelection::from(vec![RowSelector::select(1),
RowSelector::skip(1)]),
+ );
+
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_len).unwrap(),
+ )
+ .with_extensions(Arc::new(access_plan));
+
+ let make_opener = |reverse_scan: bool| {
+ ParquetOpenerBuilder::new()
+ .with_store(Arc::clone(&store))
+ .with_schema(Arc::clone(&schema))
+ .with_projection_indices(&[0])
+ .with_reverse_row_groups(reverse_scan)
+ .build()
+ };
+
+ // Forward scan: RG0(1), RG2(5), RG3(7)
+ // Note: RG1 is completely skipped
+ let opener = make_opener(false);
+ let stream = opener.open(file.clone()).unwrap().await.unwrap();
+ let forward_values = collect_int32_values(stream).await;
+
+ assert_eq!(
+ forward_values,
+ vec![1, 5, 7],
+ "Forward scan with non-contiguous row groups"
+ );
+
+ // Reverse scan: RG3(7), RG2(5), RG0(1)
+ // WITHOUT the bug fix, this would return WRONG values
+ // because the RowSelection would be incorrectly mapped
+ let opener = make_opener(true);
+ let stream = opener.open(file).unwrap().await.unwrap();
+ let reverse_values = collect_int32_values(stream).await;
+
+ assert_eq!(
+ reverse_values,
+ vec![7, 5, 1],
+ "Reverse scan with non-contiguous row groups should correctly map
RowSelection"
+ );
+ }
}
diff --git a/datafusion/datasource-parquet/src/sort.rs
b/datafusion/datasource-parquet/src/sort.rs
index 4255d4d696..abc50eeb31 100644
--- a/datafusion/datasource-parquet/src/sort.rs
+++ b/datafusion/datasource-parquet/src/sort.rs
@@ -31,25 +31,43 @@ use std::collections::HashMap;
/// 3. Reconstructs the row selection for the new order
///
/// # Arguments
-/// * `row_selection` - Original row selection
+/// * `row_selection` - Original row selection (only covers row groups that
are scanned)
/// * `parquet_metadata` - Metadata containing row group information
+/// * `row_groups_to_scan` - Indexes of row groups that will be scanned (in
original order)
///
/// # Returns
/// A new `RowSelection` adjusted for reversed row group order
+///
+/// # Important Notes
+/// The input `row_selection` only covers the row groups specified in
`row_groups_to_scan`.
+/// Row groups that are skipped (not in `row_groups_to_scan`) are not
represented in the
+/// `row_selection` at all. This function needs `row_groups_to_scan` to
correctly map
+/// the selection back to the original row groups.
pub fn reverse_row_selection(
row_selection: &RowSelection,
parquet_metadata: &ParquetMetaData,
+ row_groups_to_scan: &[usize],
) -> Result<RowSelection> {
let rg_metadata = parquet_metadata.row_groups();
- // Build a mapping of row group index to its row range in the file
+ // Build a mapping of row group index to its row range, but ONLY for
+ // the row groups that are actually being scanned.
+ //
+ // IMPORTANT: The row numbers in this mapping are RELATIVE to the scanned
row groups,
+ // not absolute positions in the file.
+ //
+ // Example: If row_groups_to_scan = [0, 2, 3] and each has 100 rows:
+ // RG0: rows 0-99 (relative to scanned data)
+ // RG2: rows 100-199 (relative to scanned data, NOT 200-299 in file!)
+ // RG3: rows 200-299 (relative to scanned data, NOT 300-399 in file!)
let mut rg_row_ranges: Vec<(usize, usize, usize)> =
- Vec::with_capacity(rg_metadata.len());
+ Vec::with_capacity(row_groups_to_scan.len());
let mut current_row = 0;
- for (rg_idx, rg) in rg_metadata.iter().enumerate() {
+ for &rg_idx in row_groups_to_scan {
+ let rg = &rg_metadata[rg_idx];
let num_rows = rg.num_rows() as usize;
rg_row_ranges.push((rg_idx, current_row, current_row + num_rows));
- current_row += num_rows;
+ current_row += num_rows; // This is relative row number, NOT absolute
file position
}
// Map selections to row groups
@@ -82,12 +100,13 @@ pub fn reverse_row_selection(
}
// Build new selection for reversed row group order
+ // Only iterate over the row groups that are being scanned, in reverse
order
let mut reversed_selectors = Vec::new();
- for rg_idx in (0..rg_metadata.len()).rev() {
+ for &rg_idx in row_groups_to_scan.iter().rev() {
if let Some(selectors) = rg_selections.get(&rg_idx) {
reversed_selectors.extend(selectors.iter().cloned());
} else {
- // No specific selection for this row group means select all
+ // No specific selection for this row group means select all rows
in it
if let Some((_, start, end)) =
rg_row_ranges.iter().find(|(idx, _, _)| *idx == rg_idx)
{
@@ -101,32 +120,30 @@ pub fn reverse_row_selection(
#[cfg(test)]
mod tests {
- use super::*;
+ use crate::ParquetAccessPlan;
+ use crate::RowGroupAccess;
+ use crate::opener::PreparedAccessPlan;
use arrow::datatypes::{DataType, Field, Schema};
use bytes::Bytes;
use parquet::arrow::ArrowWriter;
+ use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::reader::FileReader;
use parquet::file::serialized_reader::SerializedFileReader;
use std::sync::Arc;
/// Helper function to create a ParquetMetaData with specified row group
sizes
/// by actually writing a parquet file in memory
- fn create_test_metadata(row_group_sizes: Vec<i64>) -> ParquetMetaData {
- // Create a simple schema
+ fn create_test_metadata(
+ row_group_sizes: Vec<i64>,
+ ) -> parquet::file::metadata::ParquetMetaData {
let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
-
- // Create in-memory parquet file with the specified row groups
let mut buffer = Vec::new();
{
- let props = parquet::file::properties::WriterProperties::builder()
- .set_max_row_group_size(row_group_sizes[0] as usize)
- .build();
-
+ let props =
parquet::file::properties::WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(&mut buffer, schema.clone(),
Some(props)).unwrap();
for &size in &row_group_sizes {
- // Create a batch with the specified number of rows
let array = arrow::array::Int32Array::from(vec![1; size as
usize]);
let batch = arrow::record_batch::RecordBatch::try_new(
schema.clone(),
@@ -134,34 +151,131 @@ mod tests {
)
.unwrap();
writer.write(&batch).unwrap();
+ writer.flush().unwrap();
}
writer.close().unwrap();
}
- // Read back the metadata
let bytes = Bytes::from(buffer);
let reader = SerializedFileReader::new(bytes).unwrap();
reader.metadata().clone()
}
#[test]
- fn test_reverse_simple_selection() {
- // 3 row groups with 100 rows each
+ fn test_prepared_access_plan_reverse_simple() {
+ // Test: all row groups are scanned, no row selection
+ let metadata = create_test_metadata(vec![100, 100, 100]);
+
+ let access_plan = ParquetAccessPlan::new_all(3);
+ let rg_metadata = metadata.row_groups();
+
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ // Verify original plan
+ assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]);
+
+ // No row selection originally due to scanning all rows
+ assert_eq!(prepared_plan.row_selection, None);
+
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ // Verify row groups are reversed
+ assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]);
+
+ // If no selection originally, after reversal should still select all
rows,
+ // and the selection should be None
+ assert_eq!(reversed_plan.row_selection, None);
+ }
+
+ #[test]
+ fn test_prepared_access_plan_reverse_with_selection() {
+ // Test: simple row selection that spans multiple row groups
let metadata = create_test_metadata(vec![100, 100, 100]);
- // Select first 50 rows from first row group
- let selection =
- RowSelection::from(vec![RowSelector::select(50),
RowSelector::skip(250)]);
+ let mut access_plan = ParquetAccessPlan::new_all(3);
+
+ // Select first 50 rows from first row group, skip rest
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::select(50),
RowSelector::skip(50)]),
+ );
+
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+ let original_selected: usize = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ let reversed_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ assert_eq!(
+ original_selected, reversed_selected,
+ "Total selected rows should remain the same"
+ );
+ }
+
+ #[test]
+ fn test_prepared_access_plan_reverse_multi_row_group_selection() {
+ // Test: row selection spanning multiple row groups
+ let metadata = create_test_metadata(vec![100, 100, 100]);
- // Verify total selected rows remain the same
- let original_selected: usize = selection
+ let mut access_plan = ParquetAccessPlan::new_all(3);
+
+ // Create selection that spans RG0 and RG1
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::skip(50),
RowSelector::select(50)]),
+ );
+ access_plan.scan_selection(
+ 1,
+ RowSelection::from(vec![RowSelector::select(50),
RowSelector::skip(50)]),
+ );
+
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ let original_selected: usize = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- let reversed_selected: usize = reversed
+
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ let reversed_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
@@ -171,25 +285,80 @@ mod tests {
}
#[test]
- fn test_reverse_multi_row_group_selection() {
+ fn test_prepared_access_plan_reverse_empty_selection() {
+ // Test: all rows are skipped
let metadata = create_test_metadata(vec![100, 100, 100]);
- // Select rows spanning multiple row groups
- let selection = RowSelection::from(vec![
- RowSelector::skip(50),
- RowSelector::select(100), // Spans RG0 and RG1
- RowSelector::skip(150),
- ]);
+ let mut access_plan = ParquetAccessPlan::new_all(3);
+
+ // Skip all rows in all row groups
+ for i in 0..3 {
+ access_plan
+ .scan_selection(i,
RowSelection::from(vec![RowSelector::skip(100)]));
+ }
+
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
- // Verify total selected rows remain the same
- let original_selected: usize = selection
+ // Should still skip all rows
+ let total_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- let reversed_selected: usize = reversed
+
+ assert_eq!(total_selected, 0);
+ }
+
+ #[test]
+ fn test_prepared_access_plan_reverse_different_row_group_sizes() {
+ // Test: row groups with different sizes
+ let metadata = create_test_metadata(vec![50, 150, 100]);
+
+ let mut access_plan = ParquetAccessPlan::new_all(3);
+
+ // Create complex selection pattern
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::skip(25),
RowSelector::select(25)]),
+ );
+ access_plan.scan_selection(1,
RowSelection::from(vec![RowSelector::select(150)]));
+ access_plan.scan_selection(
+ 2,
+ RowSelection::from(vec![RowSelector::select(50),
RowSelector::skip(50)]),
+ );
+
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ let original_selected: usize = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
+
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ let reversed_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
@@ -199,209 +368,655 @@ mod tests {
}
#[test]
- fn test_reverse_full_selection() {
- let metadata = create_test_metadata(vec![100, 100, 100]);
+ fn test_prepared_access_plan_reverse_single_row_group() {
+ // Test: single row group case
+ let metadata = create_test_metadata(vec![100]);
- // Select all rows
- let selection = RowSelection::from(vec![RowSelector::select(300)]);
+ let mut access_plan = ParquetAccessPlan::new_all(1);
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::select(50),
RowSelector::skip(50)]),
+ );
+
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ let original_selected: usize = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
- // Should still select all rows, just in reversed row group order
- let total_selected: usize = reversed
+ // With single row group, row_group_indexes should remain [0]
+ assert_eq!(reversed_plan.row_group_indexes, vec![0]);
+
+ let reversed_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- assert_eq!(total_selected, 300);
+ assert_eq!(original_selected, reversed_selected);
+ assert_eq!(original_selected, 50);
}
#[test]
- fn test_reverse_empty_selection() {
+ fn test_prepared_access_plan_reverse_complex_pattern() {
+ // Test: complex pattern with multiple select/skip segments
let metadata = create_test_metadata(vec![100, 100, 100]);
- // Skip all rows
- let selection = RowSelection::from(vec![RowSelector::skip(300)]);
+ let mut access_plan = ParquetAccessPlan::new_all(3);
+
+ // Complex pattern: select some, skip some, select some more
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![
+ RowSelector::select(30),
+ RowSelector::skip(40),
+ RowSelector::select(30),
+ ]),
+ );
+ access_plan.scan_selection(
+ 1,
+ RowSelection::from(vec![RowSelector::skip(50),
RowSelector::select(50)]),
+ );
+ access_plan.scan_selection(2,
RowSelection::from(vec![RowSelector::select(100)]));
+
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ let original_selected: usize = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .filter(|s| !s.skip)
+ .map(|s| s.row_count)
+ .sum();
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
- // Should still skip all rows
- let total_selected: usize = reversed
+ let reversed_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- assert_eq!(total_selected, 0);
+ assert_eq!(original_selected, reversed_selected);
+ assert_eq!(original_selected, 210); // 30 + 30 + 50 + 100
}
#[test]
- fn test_reverse_with_different_row_group_sizes() {
- let metadata = create_test_metadata(vec![50, 150, 100]);
+ fn test_prepared_access_plan_reverse_with_skipped_row_groups() {
+ // This is the KEY test case for the bug fix!
+ // Test scenario where some row groups are completely skipped (not in
scan plan)
+ let metadata = create_test_metadata(vec![100, 100, 100, 100]);
- let selection = RowSelection::from(vec![
- RowSelector::skip(25),
- RowSelector::select(200), // Spans all row groups
- RowSelector::skip(75),
+ // Scenario: RG0 (scan all), RG1 (completely skipped), RG2 (partial),
RG3 (scan all)
+ // Only row groups [0, 2, 3] are in the scan plan
+ let mut access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Scan, // RG0
+ RowGroupAccess::Skip, // RG1 - NOT in scan plan!
+ RowGroupAccess::Scan, // RG2
+ RowGroupAccess::Scan, // RG3
]);
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
-
- let original_selected: usize = selection
+ // Add row selections for the scanned row groups
+ // Note: The RowSelection only covers row groups [0, 2, 3] (300 rows
total)
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::select(100)]), // RG0: all
100 rows
+ );
+ // RG1 is skipped, no selection needed
+ access_plan.scan_selection(
+ 2,
+ RowSelection::from(vec![
+ RowSelector::select(25), // RG2: first 25 rows
+ RowSelector::skip(75), // RG2: skip last 75 rows
+ ]),
+ );
+ access_plan.scan_selection(
+ 3,
+ RowSelection::from(vec![RowSelector::select(100)]), // RG3: all
100 rows
+ );
+
+ let rg_metadata = metadata.row_groups();
+
+ // Step 1: Create PreparedAccessPlan
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ // Verify original plan
+ assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
+ let original_selected: usize = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- let reversed_selected: usize = reversed
+ assert_eq!(original_selected, 225); // 100 + 25 + 100
+
+ // Step 2: Reverse the plan (this is the production code path)
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ // Verify reversed results
+ // Row group order should be reversed: [3, 2, 0]
+ assert_eq!(
+ reversed_plan.row_group_indexes,
+ vec![3, 2, 0],
+ "Row groups should be reversed"
+ );
+
+ // Verify row selection is also correctly reversed
+ let reversed_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- assert_eq!(original_selected, reversed_selected);
+ assert_eq!(
+ reversed_selected, 225,
+ "Total selected rows should remain the same"
+ );
+
+ // Verify the reversed selection structure
+ // After reversal, the order becomes: RG3, RG2, RG0
+ // - RG3: select(100)
+ // - RG2: select(25), skip(75) (note: internal order preserved, not
reversed)
+ // - RG0: select(100)
+ //
+ // After RowSelection::from() merges adjacent selectors of the same
type:
+ // - RG3's select(100) + RG2's select(25) = select(125)
+ // - RG2's skip(75) remains as skip(75)
+ // - RG0's select(100) remains as select(100)
+ let selectors: Vec<_> = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .collect();
+ assert_eq!(selectors.len(), 3);
+
+ // RG3 (100) + RG2 first part (25) merged into select(125)
+ assert!(!selectors[0].skip);
+ assert_eq!(selectors[0].row_count, 125);
+
+ // RG2: skip last 75 rows
+ assert!(selectors[1].skip);
+ assert_eq!(selectors[1].row_count, 75);
+
+ // RG0: select all 100 rows
+ assert!(!selectors[2].skip);
+ assert_eq!(selectors[2].row_count, 100);
}
#[test]
- fn test_reverse_single_row_group() {
- let metadata = create_test_metadata(vec![100]);
+ fn test_prepared_access_plan_reverse_alternating_row_groups() {
+ // Test with alternating scan/skip pattern
+ let metadata = create_test_metadata(vec![100, 100, 100, 100]);
- let selection =
- RowSelection::from(vec![RowSelector::select(50),
RowSelector::skip(50)]);
+ // Scan RG0 and RG2, skip RG1 and RG3
+ let mut access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Scan, // RG0
+ RowGroupAccess::Skip, // RG1
+ RowGroupAccess::Scan, // RG2
+ RowGroupAccess::Skip, // RG3
+ ]);
+
+ access_plan.scan_selection(0,
RowSelection::from(vec![RowSelector::select(100)]));
+ access_plan.scan_selection(2,
RowSelection::from(vec![RowSelector::select(100)]));
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
- // With single row group, selection should remain the same
- let original_selected: usize = selection
+ let original_selected: usize = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- let reversed_selected: usize = reversed
+
+ // Original: [0, 2]
+ assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);
+
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ // After reverse: [2, 0]
+ assert_eq!(reversed_plan.row_group_indexes, vec![2, 0]);
+
+ let reversed_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, reversed_selected);
+ assert_eq!(original_selected, 200);
}
#[test]
- fn test_reverse_complex_pattern() {
+ fn test_prepared_access_plan_reverse_middle_row_group_only() {
+ // Test selecting only the middle row group
let metadata = create_test_metadata(vec![100, 100, 100]);
- // Complex pattern: select some, skip some, select some more
- let selection = RowSelection::from(vec![
- RowSelector::select(30),
- RowSelector::skip(40),
- RowSelector::select(80),
- RowSelector::skip(50),
- RowSelector::select(100),
+ let mut access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Skip, // RG0
+ RowGroupAccess::Scan, // RG1
+ RowGroupAccess::Skip, // RG2
]);
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+ access_plan.scan_selection(
+ 1,
+ RowSelection::from(vec![RowSelector::select(100)]), // Select all
of RG1
+ );
- let original_selected: usize = selection
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ let original_selected: usize = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- let reversed_selected: usize = reversed
+
+ // Original: [1]
+ assert_eq!(prepared_plan.row_group_indexes, vec![1]);
+
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ // After reverse: still [1] (only one row group)
+ assert_eq!(reversed_plan.row_group_indexes, vec![1]);
+
+ let reversed_selected: usize = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
assert_eq!(original_selected, reversed_selected);
- assert_eq!(original_selected, 210); // 30 + 80 + 100
+ assert_eq!(original_selected, 100);
}
#[test]
- fn test_reverse_with_skipped_row_group() {
- // This test covers the "no specific selection" code path (lines 90-95)
- let metadata = create_test_metadata(vec![100, 100, 100]);
+ fn test_prepared_access_plan_reverse_with_skipped_row_groups_detailed() {
+ // This is the KEY test case for the bug fix!
+ // Test scenario where some row groups are completely skipped (not in
scan plan)
+ // This version includes DETAILED verification of the selector
distribution
+ let metadata = create_test_metadata(vec![100, 100, 100, 100]);
- // Select only from first and third row groups, skip middle one
entirely
- let selection = RowSelection::from(vec![
- RowSelector::select(50), // First 50 of RG0
- RowSelector::skip(150), // Rest of RG0 + all of RG1 + half of RG2
- RowSelector::select(50), // Last 50 of RG2
+ // Scenario: RG0 (scan all), RG1 (completely skipped), RG2 (partial),
RG3 (scan all)
+ // Only row groups [0, 2, 3] are in the scan plan
+ let mut access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Scan, // RG0
+ RowGroupAccess::Skip, // RG1 - NOT in scan plan!
+ RowGroupAccess::Scan, // RG2
+ RowGroupAccess::Scan, // RG3
]);
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
-
- // Verify total selected rows remain the same
- let original_selected: usize = selection
+ // Add row selections for the scanned row groups
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::select(100)]), // RG0: all
100 rows
+ );
+ // RG1 is skipped, no selection needed
+ access_plan.scan_selection(
+ 2,
+ RowSelection::from(vec![
+ RowSelector::select(25), // RG2: first 25 rows
+ RowSelector::skip(75), // RG2: skip last 75 rows
+ ]),
+ );
+ access_plan.scan_selection(
+ 3,
+ RowSelection::from(vec![RowSelector::select(100)]), // RG3: all
100 rows
+ );
+
+ let rg_metadata = metadata.row_groups();
+
+ // Step 1: Create PreparedAccessPlan
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ // Verify original plan in detail
+ assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
+
+ // Detailed verification of original selection
+ let orig_selectors: Vec<_> = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .collect();
+
+ // Original structure should be:
+ // RG0: select(100)
+ // RG2: select(25), skip(75)
+ // RG3: select(100)
+ // After merging by RowSelection::from(): select(125), skip(75),
select(100)
+ assert_eq!(
+ orig_selectors.len(),
+ 3,
+ "Original should have 3 selectors after merging"
+ );
+ assert!(
+ !orig_selectors[0].skip && orig_selectors[0].row_count == 125,
+ "Original: First selector should be select(125) from RG0(100) +
RG2(25)"
+ );
+ assert!(
+ orig_selectors[1].skip && orig_selectors[1].row_count == 75,
+ "Original: Second selector should be skip(75) from RG2"
+ );
+ assert!(
+ !orig_selectors[2].skip && orig_selectors[2].row_count == 100,
+ "Original: Third selector should be select(100) from RG3"
+ );
+
+ let original_selected: usize = orig_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- let reversed_selected: usize = reversed
+ assert_eq!(original_selected, 225); // 100 + 25 + 100
+
+ // Step 2: Reverse the plan (this is the production code path)
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ // Verify reversed results
+ // Row group order should be reversed: [3, 2, 0]
+ assert_eq!(
+ reversed_plan.row_group_indexes,
+ vec![3, 2, 0],
+ "Row groups should be reversed"
+ );
+
+ // Detailed verification of reversed selection
+ let rev_selectors: Vec<_> = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .collect();
+
+ // After reversal, the order becomes: RG3, RG2, RG0
+ // - RG3: select(100)
+ // - RG2: select(25), skip(75) (note: internal order preserved, not
reversed)
+ // - RG0: select(100)
+ //
+ // After RowSelection::from() merges adjacent selectors of the same
type:
+ // - RG3's select(100) + RG2's select(25) = select(125)
+ // - RG2's skip(75) remains as skip(75)
+ // - RG0's select(100) remains as select(100)
+
+ assert_eq!(
+ rev_selectors.len(),
+ 3,
+ "Reversed should have 3 selectors after merging"
+ );
+
+ // First selector: RG3 (100) + RG2 first part (25) merged into
select(125)
+ assert!(
+ !rev_selectors[0].skip && rev_selectors[0].row_count == 125,
+ "Reversed: First selector should be select(125) from RG3(100) +
RG2(25), got skip={} count={}",
+ rev_selectors[0].skip,
+ rev_selectors[0].row_count
+ );
+
+ // Second selector: RG2 skip last 75 rows
+ assert!(
+ rev_selectors[1].skip && rev_selectors[1].row_count == 75,
+ "Reversed: Second selector should be skip(75) from RG2, got
skip={} count={}",
+ rev_selectors[1].skip,
+ rev_selectors[1].row_count
+ );
+
+ // Third selector: RG0 select all 100 rows
+ assert!(
+ !rev_selectors[2].skip && rev_selectors[2].row_count == 100,
+ "Reversed: Third selector should be select(100) from RG0, got
skip={} count={}",
+ rev_selectors[2].skip,
+ rev_selectors[2].row_count
+ );
+
+ // Verify row selection is also correctly reversed (total count)
+ let reversed_selected: usize = rev_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- assert_eq!(original_selected, reversed_selected);
- assert_eq!(original_selected, 100); // 50 + 50
+ assert_eq!(
+ reversed_selected, 225,
+ "Total selected rows should remain the same"
+ );
}
#[test]
- fn test_reverse_middle_row_group_only() {
- // Another test to ensure skipped row groups are handled correctly
+ fn test_prepared_access_plan_reverse_complex_pattern_detailed() {
+ // Test: complex pattern with detailed verification
let metadata = create_test_metadata(vec![100, 100, 100]);
- // Select only middle row group
- let selection = RowSelection::from(vec![
- RowSelector::skip(100), // Skip RG0
- RowSelector::select(100), // Select all of RG1
- RowSelector::skip(100), // Skip RG2
- ]);
+ let mut access_plan = ParquetAccessPlan::new_all(3);
+
+ // Complex pattern: select some, skip some, select some more
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![
+ RowSelector::select(30),
+ RowSelector::skip(40),
+ RowSelector::select(30),
+ ]),
+ );
+ access_plan.scan_selection(
+ 1,
+ RowSelection::from(vec![RowSelector::skip(50),
RowSelector::select(50)]),
+ );
+ access_plan.scan_selection(2,
RowSelection::from(vec![RowSelector::select(100)]));
+
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ // Verify original selection structure in detail
+ let orig_selectors: Vec<_> = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .collect();
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+ // RG0: select(30), skip(40), select(30)
+ // RG1: skip(50), select(50)
+ // RG2: select(100)
+ // Sequential: sel(30), skip(40), sel(30), skip(50), sel(50), sel(100)
+ // After merge: sel(30), skip(40), sel(30), skip(50), sel(150)
- let original_selected: usize = selection
+ let original_selected: usize = orig_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- let reversed_selected: usize = reversed
+ assert_eq!(original_selected, 210); // 30 + 30 + 50 + 100
+
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ // Verify reversed selection structure
+ let rev_selectors: Vec<_> = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .collect();
+
+ // After reversal: RG2, RG1, RG0
+ // RG2: select(100)
+ // RG1: skip(50), select(50)
+ // RG0: select(30), skip(40), select(30)
+ // Sequential: sel(100), skip(50), sel(50), sel(30), skip(40), sel(30)
+ // After merge: sel(100), skip(50), sel(80), skip(40), sel(30)
+
+ let reversed_selected: usize = rev_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- assert_eq!(original_selected, reversed_selected);
- assert_eq!(original_selected, 100);
+ assert_eq!(
+ reversed_selected, 210,
+ "Total selected rows should remain the same (30 + 30 + 50 + 100)"
+ );
+
+ // Verify row group order
+ assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]);
}
#[test]
- fn test_reverse_alternating_row_groups() {
- // Test with more complex skipping pattern
+ fn test_prepared_access_plan_reverse_alternating_detailed() {
+ // Test with alternating scan/skip pattern with detailed verification
let metadata = create_test_metadata(vec![100, 100, 100, 100]);
- // Select first and third row groups, skip second and fourth
- let selection = RowSelection::from(vec![
- RowSelector::select(100), // RG0
- RowSelector::skip(100), // RG1
- RowSelector::select(100), // RG2
- RowSelector::skip(100), // RG3
+ // Scan RG0 and RG2, skip RG1 and RG3
+ let mut access_plan = ParquetAccessPlan::new(vec![
+ RowGroupAccess::Scan, // RG0
+ RowGroupAccess::Skip, // RG1
+ RowGroupAccess::Scan, // RG2
+ RowGroupAccess::Skip, // RG3
]);
- let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+ access_plan.scan_selection(
+ 0,
+ RowSelection::from(vec![RowSelector::select(30),
RowSelector::skip(70)]),
+ );
+ access_plan.scan_selection(
+ 2,
+ RowSelection::from(vec![RowSelector::skip(20),
RowSelector::select(80)]),
+ );
+
+ let rg_metadata = metadata.row_groups();
+ let prepared_plan =
+ PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+ .expect("Failed to create PreparedAccessPlan");
+
+ // Original: [0, 2]
+ assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);
+
+ // Verify original selection
+ let orig_selectors: Vec<_> = prepared_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .collect();
+
+ // Original:
+ // RG0: select(30), skip(70)
+ // RG2: skip(20), select(80)
+ // Sequential: sel(30), skip(90), sel(80)
+ // (RG0's skip(70) + RG2's skip(20) = skip(90))
- let original_selected: usize = selection
+ let original_selected: usize = orig_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- let reversed_selected: usize = reversed
+ assert_eq!(original_selected, 110); // 30 + 80
+
+ let reversed_plan = prepared_plan
+ .reverse(&metadata)
+ .expect("Failed to reverse PreparedAccessPlan");
+
+ // After reverse: [2, 0]
+ assert_eq!(reversed_plan.row_group_indexes, vec![2, 0]);
+
+ // Verify reversed selection
+ let rev_selectors: Vec<_> = reversed_plan
+ .row_selection
+ .as_ref()
+ .unwrap()
+ .iter()
+ .collect();
+
+ // After reversal: RG2, RG0
+ // RG2: skip(20), select(80)
+ // RG0: select(30), skip(70)
+ // Sequential: skip(20), sel(110), skip(70)
+ // (RG2's select(80) + RG0's select(30) = select(110))
+
+ let reversed_selected: usize = rev_selectors
.iter()
.filter(|s| !s.skip)
.map(|s| s.row_count)
.sum();
- assert_eq!(original_selected, reversed_selected);
- assert_eq!(original_selected, 200);
+ assert_eq!(reversed_selected, 110); // Should still be 30 + 80
+
+ // Detailed verification of structure
+ assert_eq!(rev_selectors.len(), 3, "Reversed should have 3 selectors");
+
+ assert!(
+ rev_selectors[0].skip && rev_selectors[0].row_count == 20,
+ "First selector should be skip(20) from RG2"
+ );
+
+ assert!(
+ !rev_selectors[1].skip && rev_selectors[1].row_count == 110,
+ "Second selector should be select(110) from RG2(80) + RG0(30)"
+ );
+
+ assert!(
+ rev_selectors[2].skip && rev_selectors[2].row_count == 70,
+ "Third selector should be skip(70) from RG0"
+ );
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]