This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 27de50d055 fix: Reverse row selection should respect the row group 
index (#19557)
27de50d055 is described below

commit 27de50d0553b7bbf1ec4172ec87bc23c7f2a5acb
Author: Qi Zhu <[email protected]>
AuthorDate: Wed Dec 31 14:35:33 2025 +0800

    fix: Reverse row selection should respect the row group index (#19557)
    
    ## Which issue does this PR close?
    
    - Closes [#19535](https://github.com/apache/datafusion/issues/19535)
    ## Rationale for this change
    
    Reverse row selection should respect the row group index, this PR will
    fix the issue.
    
    ## What changes are included in this PR?
    
    Reverse row selection should respect the row group index, this PR will
    fix the issue.
    
    
    ## Are these changes tested?
    
    Yes
    
    ## Are there any user-facing changes?
    
    No
---
 datafusion/datasource-parquet/src/opener.rs | 119 +++-
 datafusion/datasource-parquet/src/sort.rs   | 865 ++++++++++++++++++++++++----
 2 files changed, 851 insertions(+), 133 deletions(-)

diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index bea970f144..83bdf79c8f 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -121,16 +121,16 @@ pub(super) struct ParquetOpener {
 }
 
 /// Represents a prepared access plan with optional row selection
-struct PreparedAccessPlan {
+pub(crate) struct PreparedAccessPlan {
     /// Row group indexes to read
-    row_group_indexes: Vec<usize>,
+    pub(crate) row_group_indexes: Vec<usize>,
     /// Optional row selection for filtering within row groups
-    row_selection: Option<parquet::arrow::arrow_reader::RowSelection>,
+    pub(crate) row_selection: 
Option<parquet::arrow::arrow_reader::RowSelection>,
 }
 
 impl PreparedAccessPlan {
     /// Create a new prepared access plan from a ParquetAccessPlan
-    fn from_access_plan(
+    pub(crate) fn from_access_plan(
         access_plan: ParquetAccessPlan,
         rg_metadata: &[RowGroupMetaData],
     ) -> Result<Self> {
@@ -144,17 +144,23 @@ impl PreparedAccessPlan {
     }
 
     /// Reverse the access plan for reverse scanning
-    fn reverse(
+    pub(crate) fn reverse(
         mut self,
         file_metadata: &parquet::file::metadata::ParquetMetaData,
     ) -> Result<Self> {
+        // Get the row group indexes before reversing
+        let row_groups_to_scan = self.row_group_indexes.clone();
+
         // Reverse the row group indexes
         self.row_group_indexes = 
self.row_group_indexes.into_iter().rev().collect();
 
         // If we have a row selection, reverse it to match the new row group 
order
         if let Some(row_selection) = self.row_selection {
-            self.row_selection =
-                Some(reverse_row_selection(&row_selection, file_metadata)?);
+            self.row_selection = Some(reverse_row_selection(
+                &row_selection,
+                file_metadata,
+                &row_groups_to_scan, // Pass the original (non-reversed) row 
group indexes
+            )?);
         }
 
         Ok(self)
@@ -964,7 +970,7 @@ mod test {
     use std::sync::Arc;
 
     use super::{ConstantColumns, constant_columns_from_stats};
-    use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener};
+    use crate::{DefaultParquetFileReaderFactory, RowGroupAccess, 
opener::ParquetOpener};
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use bytes::{BufMut, BytesMut};
     use datafusion_common::{
@@ -1851,4 +1857,101 @@ mod test {
             "Reverse scan should reverse row group order while maintaining 
correct RowSelection for each group"
         );
     }
+
+    #[tokio::test]
+    async fn test_reverse_scan_with_non_contiguous_row_groups() {
+        use parquet::file::properties::WriterProperties;
+
+        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+        // Create 4 batches (4 row groups)
+        let batch0 = record_batch!(("a", Int32, vec![Some(1), 
Some(2)])).unwrap();
+        let batch1 = record_batch!(("a", Int32, vec![Some(3), 
Some(4)])).unwrap();
+        let batch2 = record_batch!(("a", Int32, vec![Some(5), 
Some(6)])).unwrap();
+        let batch3 = record_batch!(("a", Int32, vec![Some(7), 
Some(8)])).unwrap();
+
+        let props = WriterProperties::builder()
+            .set_max_row_group_size(2)
+            .build();
+
+        let data_len = write_parquet_batches(
+            Arc::clone(&store),
+            "test.parquet",
+            vec![batch0.clone(), batch1, batch2, batch3],
+            Some(props),
+        )
+        .await;
+
+        let schema = batch0.schema();
+
+        use crate::ParquetAccessPlan;
+        use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
+
+        // KEY: Skip RG1 (non-contiguous!)
+        // Only scan row groups: [0, 2, 3]
+        let mut access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan, // RG0
+            RowGroupAccess::Skip, // RG1 - SKIPPED!
+            RowGroupAccess::Scan, // RG2
+            RowGroupAccess::Scan, // RG3
+        ]);
+
+        // Add RowSelection for each scanned row group
+        // RG0: select first row (1), skip second (2)
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::select(1), 
RowSelector::skip(1)]),
+        );
+        // RG1: skipped, no selection needed
+        // RG2: select first row (5), skip second (6)
+        access_plan.scan_selection(
+            2,
+            RowSelection::from(vec![RowSelector::select(1), 
RowSelector::skip(1)]),
+        );
+        // RG3: select first row (7), skip second (8)
+        access_plan.scan_selection(
+            3,
+            RowSelection::from(vec![RowSelector::select(1), 
RowSelector::skip(1)]),
+        );
+
+        let file = PartitionedFile::new(
+            "test.parquet".to_string(),
+            u64::try_from(data_len).unwrap(),
+        )
+        .with_extensions(Arc::new(access_plan));
+
+        let make_opener = |reverse_scan: bool| {
+            ParquetOpenerBuilder::new()
+                .with_store(Arc::clone(&store))
+                .with_schema(Arc::clone(&schema))
+                .with_projection_indices(&[0])
+                .with_reverse_row_groups(reverse_scan)
+                .build()
+        };
+
+        // Forward scan: RG0(1), RG2(5), RG3(7)
+        // Note: RG1 is completely skipped
+        let opener = make_opener(false);
+        let stream = opener.open(file.clone()).unwrap().await.unwrap();
+        let forward_values = collect_int32_values(stream).await;
+
+        assert_eq!(
+            forward_values,
+            vec![1, 5, 7],
+            "Forward scan with non-contiguous row groups"
+        );
+
+        // Reverse scan: RG3(7), RG2(5), RG0(1)
+        // WITHOUT the bug fix, this would return WRONG values
+        // because the RowSelection would be incorrectly mapped
+        let opener = make_opener(true);
+        let stream = opener.open(file).unwrap().await.unwrap();
+        let reverse_values = collect_int32_values(stream).await;
+
+        assert_eq!(
+            reverse_values,
+            vec![7, 5, 1],
+            "Reverse scan with non-contiguous row groups should correctly map 
RowSelection"
+        );
+    }
 }
diff --git a/datafusion/datasource-parquet/src/sort.rs 
b/datafusion/datasource-parquet/src/sort.rs
index 4255d4d696..abc50eeb31 100644
--- a/datafusion/datasource-parquet/src/sort.rs
+++ b/datafusion/datasource-parquet/src/sort.rs
@@ -31,25 +31,43 @@ use std::collections::HashMap;
 /// 3. Reconstructs the row selection for the new order
 ///
 /// # Arguments
-/// * `row_selection` - Original row selection
+/// * `row_selection` - Original row selection (only covers row groups that 
are scanned)
 /// * `parquet_metadata` - Metadata containing row group information
+/// * `row_groups_to_scan` - Indexes of row groups that will be scanned (in 
original order)
 ///
 /// # Returns
 /// A new `RowSelection` adjusted for reversed row group order
+///
+/// # Important Notes
+/// The input `row_selection` only covers the row groups specified in 
`row_groups_to_scan`.
+/// Row groups that are skipped (not in `row_groups_to_scan`) are not 
represented in the
+/// `row_selection` at all. This function needs `row_groups_to_scan` to 
correctly map
+/// the selection back to the original row groups.
 pub fn reverse_row_selection(
     row_selection: &RowSelection,
     parquet_metadata: &ParquetMetaData,
+    row_groups_to_scan: &[usize],
 ) -> Result<RowSelection> {
     let rg_metadata = parquet_metadata.row_groups();
 
-    // Build a mapping of row group index to its row range in the file
+    // Build a mapping of row group index to its row range, but ONLY for
+    // the row groups that are actually being scanned.
+    //
+    // IMPORTANT: The row numbers in this mapping are RELATIVE to the scanned 
row groups,
+    // not absolute positions in the file.
+    //
+    // Example: If row_groups_to_scan = [0, 2, 3] and each has 100 rows:
+    //   RG0: rows 0-99 (relative to scanned data)
+    //   RG2: rows 100-199 (relative to scanned data, NOT 200-299 in file!)
+    //   RG3: rows 200-299 (relative to scanned data, NOT 300-399 in file!)
     let mut rg_row_ranges: Vec<(usize, usize, usize)> =
-        Vec::with_capacity(rg_metadata.len());
+        Vec::with_capacity(row_groups_to_scan.len());
     let mut current_row = 0;
-    for (rg_idx, rg) in rg_metadata.iter().enumerate() {
+    for &rg_idx in row_groups_to_scan {
+        let rg = &rg_metadata[rg_idx];
         let num_rows = rg.num_rows() as usize;
         rg_row_ranges.push((rg_idx, current_row, current_row + num_rows));
-        current_row += num_rows;
+        current_row += num_rows; // This is relative row number, NOT absolute 
file position
     }
 
     // Map selections to row groups
@@ -82,12 +100,13 @@ pub fn reverse_row_selection(
     }
 
     // Build new selection for reversed row group order
+    // Only iterate over the row groups that are being scanned, in reverse 
order
     let mut reversed_selectors = Vec::new();
-    for rg_idx in (0..rg_metadata.len()).rev() {
+    for &rg_idx in row_groups_to_scan.iter().rev() {
         if let Some(selectors) = rg_selections.get(&rg_idx) {
             reversed_selectors.extend(selectors.iter().cloned());
         } else {
-            // No specific selection for this row group means select all
+            // No specific selection for this row group means select all rows 
in it
             if let Some((_, start, end)) =
                 rg_row_ranges.iter().find(|(idx, _, _)| *idx == rg_idx)
             {
@@ -101,32 +120,30 @@ pub fn reverse_row_selection(
 
 #[cfg(test)]
 mod tests {
-    use super::*;
+    use crate::ParquetAccessPlan;
+    use crate::RowGroupAccess;
+    use crate::opener::PreparedAccessPlan;
     use arrow::datatypes::{DataType, Field, Schema};
     use bytes::Bytes;
     use parquet::arrow::ArrowWriter;
+    use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
     use parquet::file::reader::FileReader;
     use parquet::file::serialized_reader::SerializedFileReader;
     use std::sync::Arc;
 
     /// Helper function to create a ParquetMetaData with specified row group 
sizes
     /// by actually writing a parquet file in memory
-    fn create_test_metadata(row_group_sizes: Vec<i64>) -> ParquetMetaData {
-        // Create a simple schema
+    fn create_test_metadata(
+        row_group_sizes: Vec<i64>,
+    ) -> parquet::file::metadata::ParquetMetaData {
         let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
-
-        // Create in-memory parquet file with the specified row groups
         let mut buffer = Vec::new();
         {
-            let props = parquet::file::properties::WriterProperties::builder()
-                .set_max_row_group_size(row_group_sizes[0] as usize)
-                .build();
-
+            let props = 
parquet::file::properties::WriterProperties::builder().build();
             let mut writer =
                 ArrowWriter::try_new(&mut buffer, schema.clone(), 
Some(props)).unwrap();
 
             for &size in &row_group_sizes {
-                // Create a batch with the specified number of rows
                 let array = arrow::array::Int32Array::from(vec![1; size as 
usize]);
                 let batch = arrow::record_batch::RecordBatch::try_new(
                     schema.clone(),
@@ -134,34 +151,131 @@ mod tests {
                 )
                 .unwrap();
                 writer.write(&batch).unwrap();
+                writer.flush().unwrap();
             }
             writer.close().unwrap();
         }
 
-        // Read back the metadata
         let bytes = Bytes::from(buffer);
         let reader = SerializedFileReader::new(bytes).unwrap();
         reader.metadata().clone()
     }
 
     #[test]
-    fn test_reverse_simple_selection() {
-        // 3 row groups with 100 rows each
+    fn test_prepared_access_plan_reverse_simple() {
+        // Test: all row groups are scanned, no row selection
+        let metadata = create_test_metadata(vec![100, 100, 100]);
+
+        let access_plan = ParquetAccessPlan::new_all(3);
+        let rg_metadata = metadata.row_groups();
+
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        // Verify original plan
+        assert_eq!(prepared_plan.row_group_indexes, vec![0, 1, 2]);
+
+        // No row selection originally due to scanning all rows
+        assert_eq!(prepared_plan.row_selection, None);
+
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        // Verify row groups are reversed
+        assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]);
+
+        // If no selection originally, after reversal should still select all 
rows,
+        // and the selection should be None
+        assert_eq!(reversed_plan.row_selection, None);
+    }
+
+    #[test]
+    fn test_prepared_access_plan_reverse_with_selection() {
+        // Test: simple row selection that spans multiple row groups
         let metadata = create_test_metadata(vec![100, 100, 100]);
 
-        // Select first 50 rows from first row group
-        let selection =
-            RowSelection::from(vec![RowSelector::select(50), 
RowSelector::skip(250)]);
+        let mut access_plan = ParquetAccessPlan::new_all(3);
+
+        // Select first 50 rows from first row group, skip rest
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::select(50), 
RowSelector::skip(50)]),
+        );
+
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+        let original_selected: usize = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .filter(|s| !s.skip)
+            .map(|s| s.row_count)
+            .sum();
+
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        let reversed_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .filter(|s| !s.skip)
+            .map(|s| s.row_count)
+            .sum();
+
+        assert_eq!(
+            original_selected, reversed_selected,
+            "Total selected rows should remain the same"
+        );
+    }
+
+    #[test]
+    fn test_prepared_access_plan_reverse_multi_row_group_selection() {
+        // Test: row selection spanning multiple row groups
+        let metadata = create_test_metadata(vec![100, 100, 100]);
 
-        // Verify total selected rows remain the same
-        let original_selected: usize = selection
+        let mut access_plan = ParquetAccessPlan::new_all(3);
+
+        // Create selection that spans RG0 and RG1
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::skip(50), 
RowSelector::select(50)]),
+        );
+        access_plan.scan_selection(
+            1,
+            RowSelection::from(vec![RowSelector::select(50), 
RowSelector::skip(50)]),
+        );
+
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        let original_selected: usize = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
-        let reversed_selected: usize = reversed
+
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        let reversed_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
@@ -171,25 +285,80 @@ mod tests {
     }
 
     #[test]
-    fn test_reverse_multi_row_group_selection() {
+    fn test_prepared_access_plan_reverse_empty_selection() {
+        // Test: all rows are skipped
         let metadata = create_test_metadata(vec![100, 100, 100]);
 
-        // Select rows spanning multiple row groups
-        let selection = RowSelection::from(vec![
-            RowSelector::skip(50),
-            RowSelector::select(100), // Spans RG0 and RG1
-            RowSelector::skip(150),
-        ]);
+        let mut access_plan = ParquetAccessPlan::new_all(3);
+
+        // Skip all rows in all row groups
+        for i in 0..3 {
+            access_plan
+                .scan_selection(i, 
RowSelection::from(vec![RowSelector::skip(100)]));
+        }
+
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
 
-        // Verify total selected rows remain the same
-        let original_selected: usize = selection
+        // Should still skip all rows
+        let total_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
-        let reversed_selected: usize = reversed
+
+        assert_eq!(total_selected, 0);
+    }
+
+    #[test]
+    fn test_prepared_access_plan_reverse_different_row_group_sizes() {
+        // Test: row groups with different sizes
+        let metadata = create_test_metadata(vec![50, 150, 100]);
+
+        let mut access_plan = ParquetAccessPlan::new_all(3);
+
+        // Create complex selection pattern
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::skip(25), 
RowSelector::select(25)]),
+        );
+        access_plan.scan_selection(1, 
RowSelection::from(vec![RowSelector::select(150)]));
+        access_plan.scan_selection(
+            2,
+            RowSelection::from(vec![RowSelector::select(50), 
RowSelector::skip(50)]),
+        );
+
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        let original_selected: usize = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .filter(|s| !s.skip)
+            .map(|s| s.row_count)
+            .sum();
+
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        let reversed_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
@@ -199,209 +368,655 @@ mod tests {
     }
 
     #[test]
-    fn test_reverse_full_selection() {
-        let metadata = create_test_metadata(vec![100, 100, 100]);
+    fn test_prepared_access_plan_reverse_single_row_group() {
+        // Test: single row group case
+        let metadata = create_test_metadata(vec![100]);
 
-        // Select all rows
-        let selection = RowSelection::from(vec![RowSelector::select(300)]);
+        let mut access_plan = ParquetAccessPlan::new_all(1);
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::select(50), 
RowSelector::skip(50)]),
+        );
+
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        let original_selected: usize = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .filter(|s| !s.skip)
+            .map(|s| s.row_count)
+            .sum();
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
 
-        // Should still select all rows, just in reversed row group order
-        let total_selected: usize = reversed
+        // With single row group, row_group_indexes should remain [0]
+        assert_eq!(reversed_plan.row_group_indexes, vec![0]);
+
+        let reversed_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
 
-        assert_eq!(total_selected, 300);
+        assert_eq!(original_selected, reversed_selected);
+        assert_eq!(original_selected, 50);
     }
 
     #[test]
-    fn test_reverse_empty_selection() {
+    fn test_prepared_access_plan_reverse_complex_pattern() {
+        // Test: complex pattern with multiple select/skip segments
         let metadata = create_test_metadata(vec![100, 100, 100]);
 
-        // Skip all rows
-        let selection = RowSelection::from(vec![RowSelector::skip(300)]);
+        let mut access_plan = ParquetAccessPlan::new_all(3);
+
+        // Complex pattern: select some, skip some, select some more
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![
+                RowSelector::select(30),
+                RowSelector::skip(40),
+                RowSelector::select(30),
+            ]),
+        );
+        access_plan.scan_selection(
+            1,
+            RowSelection::from(vec![RowSelector::skip(50), 
RowSelector::select(50)]),
+        );
+        access_plan.scan_selection(2, 
RowSelection::from(vec![RowSelector::select(100)]));
+
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        let original_selected: usize = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .filter(|s| !s.skip)
+            .map(|s| s.row_count)
+            .sum();
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
 
-        // Should still skip all rows
-        let total_selected: usize = reversed
+        let reversed_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
 
-        assert_eq!(total_selected, 0);
+        assert_eq!(original_selected, reversed_selected);
+        assert_eq!(original_selected, 210); // 30 + 30 + 50 + 100
     }
 
     #[test]
-    fn test_reverse_with_different_row_group_sizes() {
-        let metadata = create_test_metadata(vec![50, 150, 100]);
+    fn test_prepared_access_plan_reverse_with_skipped_row_groups() {
+        // This is the KEY test case for the bug fix!
+        // Test scenario where some row groups are completely skipped (not in 
scan plan)
+        let metadata = create_test_metadata(vec![100, 100, 100, 100]);
 
-        let selection = RowSelection::from(vec![
-            RowSelector::skip(25),
-            RowSelector::select(200), // Spans all row groups
-            RowSelector::skip(75),
+        // Scenario: RG0 (scan all), RG1 (completely skipped), RG2 (partial), 
RG3 (scan all)
+        // Only row groups [0, 2, 3] are in the scan plan
+        let mut access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan, // RG0
+            RowGroupAccess::Skip, // RG1 - NOT in scan plan!
+            RowGroupAccess::Scan, // RG2
+            RowGroupAccess::Scan, // RG3
         ]);
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
-
-        let original_selected: usize = selection
+        // Add row selections for the scanned row groups
+        // Note: The RowSelection only covers row groups [0, 2, 3] (300 rows 
total)
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::select(100)]), // RG0: all 
100 rows
+        );
+        // RG1 is skipped, no selection needed
+        access_plan.scan_selection(
+            2,
+            RowSelection::from(vec![
+                RowSelector::select(25), // RG2: first 25 rows
+                RowSelector::skip(75),   // RG2: skip last 75 rows
+            ]),
+        );
+        access_plan.scan_selection(
+            3,
+            RowSelection::from(vec![RowSelector::select(100)]), // RG3: all 
100 rows
+        );
+
+        let rg_metadata = metadata.row_groups();
+
+        // Step 1: Create PreparedAccessPlan
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        // Verify original plan
+        assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
+        let original_selected: usize = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
-        let reversed_selected: usize = reversed
+        assert_eq!(original_selected, 225); // 100 + 25 + 100
+
+        // Step 2: Reverse the plan (this is the production code path)
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        // Verify reversed results
+        // Row group order should be reversed: [3, 2, 0]
+        assert_eq!(
+            reversed_plan.row_group_indexes,
+            vec![3, 2, 0],
+            "Row groups should be reversed"
+        );
+
+        // Verify row selection is also correctly reversed
+        let reversed_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
 
-        assert_eq!(original_selected, reversed_selected);
+        assert_eq!(
+            reversed_selected, 225,
+            "Total selected rows should remain the same"
+        );
+
+        // Verify the reversed selection structure
+        // After reversal, the order becomes: RG3, RG2, RG0
+        // - RG3: select(100)
+        // - RG2: select(25), skip(75)  (note: internal order preserved, not 
reversed)
+        // - RG0: select(100)
+        //
+        // After RowSelection::from() merges adjacent selectors of the same 
type:
+        // - RG3's select(100) + RG2's select(25) = select(125)
+        // - RG2's skip(75) remains as skip(75)
+        // - RG0's select(100) remains as select(100)
+        let selectors: Vec<_> = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .collect();
+        assert_eq!(selectors.len(), 3);
+
+        // RG3 (100) + RG2 first part (25) merged into select(125)
+        assert!(!selectors[0].skip);
+        assert_eq!(selectors[0].row_count, 125);
+
+        // RG2: skip last 75 rows
+        assert!(selectors[1].skip);
+        assert_eq!(selectors[1].row_count, 75);
+
+        // RG0: select all 100 rows
+        assert!(!selectors[2].skip);
+        assert_eq!(selectors[2].row_count, 100);
     }
 
     #[test]
-    fn test_reverse_single_row_group() {
-        let metadata = create_test_metadata(vec![100]);
+    fn test_prepared_access_plan_reverse_alternating_row_groups() {
+        // Test with alternating scan/skip pattern
+        let metadata = create_test_metadata(vec![100, 100, 100, 100]);
 
-        let selection =
-            RowSelection::from(vec![RowSelector::select(50), 
RowSelector::skip(50)]);
+        // Scan RG0 and RG2, skip RG1 and RG3
+        let mut access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan, // RG0
+            RowGroupAccess::Skip, // RG1
+            RowGroupAccess::Scan, // RG2
+            RowGroupAccess::Skip, // RG3
+        ]);
+
+        access_plan.scan_selection(0, 
RowSelection::from(vec![RowSelector::select(100)]));
+        access_plan.scan_selection(2, 
RowSelection::from(vec![RowSelector::select(100)]));
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
 
-        // With single row group, selection should remain the same
-        let original_selected: usize = selection
+        let original_selected: usize = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
-        let reversed_selected: usize = reversed
+
+        // Original: [0, 2]
+        assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);
+
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        // After reverse: [2, 0]
+        assert_eq!(reversed_plan.row_group_indexes, vec![2, 0]);
+
+        let reversed_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
 
         assert_eq!(original_selected, reversed_selected);
+        assert_eq!(original_selected, 200);
     }
 
     #[test]
-    fn test_reverse_complex_pattern() {
+    fn test_prepared_access_plan_reverse_middle_row_group_only() {
+        // Test selecting only the middle row group
         let metadata = create_test_metadata(vec![100, 100, 100]);
 
-        // Complex pattern: select some, skip some, select some more
-        let selection = RowSelection::from(vec![
-            RowSelector::select(30),
-            RowSelector::skip(40),
-            RowSelector::select(80),
-            RowSelector::skip(50),
-            RowSelector::select(100),
+        let mut access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Skip, // RG0
+            RowGroupAccess::Scan, // RG1
+            RowGroupAccess::Skip, // RG2
         ]);
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+        access_plan.scan_selection(
+            1,
+            RowSelection::from(vec![RowSelector::select(100)]), // Select all 
of RG1
+        );
 
-        let original_selected: usize = selection
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        let original_selected: usize = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
-        let reversed_selected: usize = reversed
+
+        // Original: [1]
+        assert_eq!(prepared_plan.row_group_indexes, vec![1]);
+
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        // After reverse: still [1] (only one row group)
+        assert_eq!(reversed_plan.row_group_indexes, vec![1]);
+
+        let reversed_selected: usize = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
 
         assert_eq!(original_selected, reversed_selected);
-        assert_eq!(original_selected, 210); // 30 + 80 + 100
+        assert_eq!(original_selected, 100);
     }
 
     #[test]
-    fn test_reverse_with_skipped_row_group() {
-        // This test covers the "no specific selection" code path (lines 90-95)
-        let metadata = create_test_metadata(vec![100, 100, 100]);
+    fn test_prepared_access_plan_reverse_with_skipped_row_groups_detailed() {
+        // This is the KEY test case for the bug fix!
+        // Test scenario where some row groups are completely skipped (not in 
scan plan)
+        // This version includes DETAILED verification of the selector 
distribution
+        let metadata = create_test_metadata(vec![100, 100, 100, 100]);
 
-        // Select only from first and third row groups, skip middle one 
entirely
-        let selection = RowSelection::from(vec![
-            RowSelector::select(50), // First 50 of RG0
-            RowSelector::skip(150),  // Rest of RG0 + all of RG1 + half of RG2
-            RowSelector::select(50), // Last 50 of RG2
+        // Scenario: RG0 (scan all), RG1 (completely skipped), RG2 (partial), 
RG3 (scan all)
+        // Only row groups [0, 2, 3] are in the scan plan
+        let mut access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan, // RG0
+            RowGroupAccess::Skip, // RG1 - NOT in scan plan!
+            RowGroupAccess::Scan, // RG2
+            RowGroupAccess::Scan, // RG3
         ]);
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
-
-        // Verify total selected rows remain the same
-        let original_selected: usize = selection
+        // Add row selections for the scanned row groups
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::select(100)]), // RG0: all 
100 rows
+        );
+        // RG1 is skipped, no selection needed
+        access_plan.scan_selection(
+            2,
+            RowSelection::from(vec![
+                RowSelector::select(25), // RG2: first 25 rows
+                RowSelector::skip(75),   // RG2: skip last 75 rows
+            ]),
+        );
+        access_plan.scan_selection(
+            3,
+            RowSelection::from(vec![RowSelector::select(100)]), // RG3: all 
100 rows
+        );
+
+        let rg_metadata = metadata.row_groups();
+
+        // Step 1: Create PreparedAccessPlan
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        // Verify original plan in detail
+        assert_eq!(prepared_plan.row_group_indexes, vec![0, 2, 3]);
+
+        // Detailed verification of original selection
+        let orig_selectors: Vec<_> = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .collect();
+
+        // Original structure should be:
+        // RG0: select(100)
+        // RG2: select(25), skip(75)
+        // RG3: select(100)
+        // After merging by RowSelection::from(): select(125), skip(75), 
select(100)
+        assert_eq!(
+            orig_selectors.len(),
+            3,
+            "Original should have 3 selectors after merging"
+        );
+        assert!(
+            !orig_selectors[0].skip && orig_selectors[0].row_count == 125,
+            "Original: First selector should be select(125) from RG0(100) + 
RG2(25)"
+        );
+        assert!(
+            orig_selectors[1].skip && orig_selectors[1].row_count == 75,
+            "Original: Second selector should be skip(75) from RG2"
+        );
+        assert!(
+            !orig_selectors[2].skip && orig_selectors[2].row_count == 100,
+            "Original: Third selector should be select(100) from RG3"
+        );
+
+        let original_selected: usize = orig_selectors
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
-        let reversed_selected: usize = reversed
+        assert_eq!(original_selected, 225); // 100 + 25 + 100
+
+        // Step 2: Reverse the plan (this is the production code path)
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        // Verify reversed results
+        // Row group order should be reversed: [3, 2, 0]
+        assert_eq!(
+            reversed_plan.row_group_indexes,
+            vec![3, 2, 0],
+            "Row groups should be reversed"
+        );
+
+        // Detailed verification of reversed selection
+        let rev_selectors: Vec<_> = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .collect();
+
+        // After reversal, the order becomes: RG3, RG2, RG0
+        // - RG3: select(100)
+        // - RG2: select(25), skip(75)  (note: internal order preserved, not 
reversed)
+        // - RG0: select(100)
+        //
+        // After RowSelection::from() merges adjacent selectors of the same 
type:
+        // - RG3's select(100) + RG2's select(25) = select(125)
+        // - RG2's skip(75) remains as skip(75)
+        // - RG0's select(100) remains as select(100)
+
+        assert_eq!(
+            rev_selectors.len(),
+            3,
+            "Reversed should have 3 selectors after merging"
+        );
+
+        // First selector: RG3 (100) + RG2 first part (25) merged into 
select(125)
+        assert!(
+            !rev_selectors[0].skip && rev_selectors[0].row_count == 125,
+            "Reversed: First selector should be select(125) from RG3(100) + 
RG2(25), got skip={} count={}",
+            rev_selectors[0].skip,
+            rev_selectors[0].row_count
+        );
+
+        // Second selector: RG2 skip last 75 rows
+        assert!(
+            rev_selectors[1].skip && rev_selectors[1].row_count == 75,
+            "Reversed: Second selector should be skip(75) from RG2, got 
skip={} count={}",
+            rev_selectors[1].skip,
+            rev_selectors[1].row_count
+        );
+
+        // Third selector: RG0 select all 100 rows
+        assert!(
+            !rev_selectors[2].skip && rev_selectors[2].row_count == 100,
+            "Reversed: Third selector should be select(100) from RG0, got 
skip={} count={}",
+            rev_selectors[2].skip,
+            rev_selectors[2].row_count
+        );
+
+        // Verify row selection is also correctly reversed (total count)
+        let reversed_selected: usize = rev_selectors
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
 
-        assert_eq!(original_selected, reversed_selected);
-        assert_eq!(original_selected, 100); // 50 + 50
+        assert_eq!(
+            reversed_selected, 225,
+            "Total selected rows should remain the same"
+        );
     }
 
     #[test]
-    fn test_reverse_middle_row_group_only() {
-        // Another test to ensure skipped row groups are handled correctly
+    fn test_prepared_access_plan_reverse_complex_pattern_detailed() {
+        // Test: complex pattern with detailed verification
         let metadata = create_test_metadata(vec![100, 100, 100]);
 
-        // Select only middle row group
-        let selection = RowSelection::from(vec![
-            RowSelector::skip(100),   // Skip RG0
-            RowSelector::select(100), // Select all of RG1
-            RowSelector::skip(100),   // Skip RG2
-        ]);
+        let mut access_plan = ParquetAccessPlan::new_all(3);
+
+        // Complex pattern: select some, skip some, select some more
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![
+                RowSelector::select(30),
+                RowSelector::skip(40),
+                RowSelector::select(30),
+            ]),
+        );
+        access_plan.scan_selection(
+            1,
+            RowSelection::from(vec![RowSelector::skip(50), 
RowSelector::select(50)]),
+        );
+        access_plan.scan_selection(2, 
RowSelection::from(vec![RowSelector::select(100)]));
+
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        // Verify original selection structure in detail
+        let orig_selectors: Vec<_> = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .collect();
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+        // RG0: select(30), skip(40), select(30)
+        // RG1: skip(50), select(50)
+        // RG2: select(100)
+        // Sequential: sel(30), skip(40), sel(30), skip(50), sel(50), sel(100)
+        // After merge: sel(30), skip(40), sel(30), skip(50), sel(150)
 
-        let original_selected: usize = selection
+        let original_selected: usize = orig_selectors
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
-        let reversed_selected: usize = reversed
+        assert_eq!(original_selected, 210); // 30 + 30 + 50 + 100
+
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        // Verify reversed selection structure
+        let rev_selectors: Vec<_> = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .collect();
+
+        // After reversal: RG2, RG1, RG0
+        // RG2: select(100)
+        // RG1: skip(50), select(50)
+        // RG0: select(30), skip(40), select(30)
+        // Sequential: sel(100), skip(50), sel(50), sel(30), skip(40), sel(30)
+        // After merge: sel(100), skip(50), sel(80), skip(40), sel(30)
+
+        let reversed_selected: usize = rev_selectors
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
 
-        assert_eq!(original_selected, reversed_selected);
-        assert_eq!(original_selected, 100);
+        assert_eq!(
+            reversed_selected, 210,
+            "Total selected rows should remain the same (30 + 30 + 50 + 100)"
+        );
+
+        // Verify row group order
+        assert_eq!(reversed_plan.row_group_indexes, vec![2, 1, 0]);
     }
 
     #[test]
-    fn test_reverse_alternating_row_groups() {
-        // Test with more complex skipping pattern
+    fn test_prepared_access_plan_reverse_alternating_detailed() {
+        // Test with alternating scan/skip pattern with detailed verification
         let metadata = create_test_metadata(vec![100, 100, 100, 100]);
 
-        // Select first and third row groups, skip second and fourth
-        let selection = RowSelection::from(vec![
-            RowSelector::select(100), // RG0
-            RowSelector::skip(100),   // RG1
-            RowSelector::select(100), // RG2
-            RowSelector::skip(100),   // RG3
+        // Scan RG0 and RG2, skip RG1 and RG3
+        let mut access_plan = ParquetAccessPlan::new(vec![
+            RowGroupAccess::Scan, // RG0
+            RowGroupAccess::Skip, // RG1
+            RowGroupAccess::Scan, // RG2
+            RowGroupAccess::Skip, // RG3
         ]);
 
-        let reversed = reverse_row_selection(&selection, &metadata).unwrap();
+        access_plan.scan_selection(
+            0,
+            RowSelection::from(vec![RowSelector::select(30), 
RowSelector::skip(70)]),
+        );
+        access_plan.scan_selection(
+            2,
+            RowSelection::from(vec![RowSelector::skip(20), 
RowSelector::select(80)]),
+        );
+
+        let rg_metadata = metadata.row_groups();
+        let prepared_plan =
+            PreparedAccessPlan::from_access_plan(access_plan, rg_metadata)
+                .expect("Failed to create PreparedAccessPlan");
+
+        // Original: [0, 2]
+        assert_eq!(prepared_plan.row_group_indexes, vec![0, 2]);
+
+        // Verify original selection
+        let orig_selectors: Vec<_> = prepared_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .collect();
+
+        // Original:
+        // RG0: select(30), skip(70)
+        // RG2: skip(20), select(80)
+        // Sequential: sel(30), skip(90), sel(80)
+        //   (RG0's skip(70) + RG2's skip(20) = skip(90))
 
-        let original_selected: usize = selection
+        let original_selected: usize = orig_selectors
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
-        let reversed_selected: usize = reversed
+        assert_eq!(original_selected, 110); // 30 + 80
+
+        let reversed_plan = prepared_plan
+            .reverse(&metadata)
+            .expect("Failed to reverse PreparedAccessPlan");
+
+        // After reverse: [2, 0]
+        assert_eq!(reversed_plan.row_group_indexes, vec![2, 0]);
+
+        // Verify reversed selection
+        let rev_selectors: Vec<_> = reversed_plan
+            .row_selection
+            .as_ref()
+            .unwrap()
+            .iter()
+            .collect();
+
+        // After reversal: RG2, RG0
+        // RG2: skip(20), select(80)
+        // RG0: select(30), skip(70)
+        // Sequential: skip(20), sel(110), skip(70)
+        //   (RG2's select(80) + RG0's select(30) = select(110))
+
+        let reversed_selected: usize = rev_selectors
             .iter()
             .filter(|s| !s.skip)
             .map(|s| s.row_count)
             .sum();
 
-        assert_eq!(original_selected, reversed_selected);
-        assert_eq!(original_selected, 200);
+        assert_eq!(reversed_selected, 110); // Should still be 30 + 80
+
+        // Detailed verification of structure
+        assert_eq!(rev_selectors.len(), 3, "Reversed should have 3 selectors");
+
+        assert!(
+            rev_selectors[0].skip && rev_selectors[0].row_count == 20,
+            "First selector should be skip(20) from RG2"
+        );
+
+        assert!(
+            !rev_selectors[1].skip && rev_selectors[1].row_count == 110,
+            "Second selector should be select(110) from RG2(80) + RG0(30)"
+        );
+
+        assert!(
+            rev_selectors[2].skip && rev_selectors[2].row_count == 70,
+            "Third selector should be skip(70) from RG0"
+        );
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to