This is an automated email from the ASF dual-hosted git repository.

Jefffrey pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new f03e1bc028 fix(ipc): handle duplicate projection indices in IPC reader 
(#9952)
f03e1bc028 is described below

commit f03e1bc028630561af7d4adfc000916337ec6ce5
Author: pchintar <[email protected]>
AuthorDate: Wed Jun 3 06:06:16 2026 +0530

    fix(ipc): handle duplicate projection indices in IPC reader (#9952)
    
    # Which issue does this PR close?
    
    - Closes #9950 .
    
    # Rationale for this change
    
    The current IPC reader does not correctly handle duplicate projection
    indices.
    
    `Schema::project`(in `arrow-schema/src/schema.rs`) and
    `RecordBatch::project`(in `arrow-array/src/record_batch.rs`) both map
    each requested index directly, preserve the projection order and allow
    duplicate indices such as:
    
    ```rust id="n4pq0f"
    vec![1, 1]
    ```
    
    However, the IPC reader currently uses:
    
    ```rust id="gjklyo"
    projection.iter().position(|p| p == &idx)
    ```
    
    which only returns the first matching entry. As a result, only one
    column is decoded even though the projected schema contains multiple
    fields, leading to schema/column count mismatches when constructing the
    `RecordBatch`.
    
    This also affects reordered duplicate projections such as:
    
    ```rust id="jlwmku"
    vec![2, 0, 2]
    ```
    
    # What changes are included in this PR?
    
    * Updated IPC projection handling in `arrow-ipc/src/reader.rs` to
    preserve all matching projection entries
    * Reused the decoded array for duplicate projection indices instead of
    decoding the same field multiple times
    * Preserved projection order for reordered duplicate projections
    
    # Are these changes tested?
    
    Yes.
    
    Added `test_projection_duplicate_indices`, which verifies:
    
    * duplicate projections (`vec![1, 1]`)
    * reordered duplicate projections (`vec![2, 0, 2]`)
    
    The test compares IPC projection results against `RecordBatch::project`.
    
    The test fails before the fix and passes after it.
    
    All existing `arrow-ipc` tests also pass `cargo test -p arrow-ipc --lib`
    
    # Are there any user-facing changes?
    
    No.
---
 arrow-ipc/src/reader.rs | 43 ++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 38 insertions(+), 5 deletions(-)

diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 6d1466febf..6d1e799d43 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -560,11 +560,20 @@ impl<'a> RecordBatchDecoder<'a> {
             let mut arrays = vec![];
             // project fields
             for (idx, field) in schema.fields().iter().enumerate() {
-                // Create array for projected field
-                if let Some(proj_idx) = projection.iter().position(|p| p == 
&idx) {
-                    let child = self.create_array(field, &mut 
variadic_counts)?;
-                    arrays.push((proj_idx, child));
-                } else {
+                // A projected field can appear more than once, so collect all 
matching positions.
+                let mut child = None;
+                for (proj_idx, projected_idx) in projection.iter().enumerate() 
{
+                    if *projected_idx == idx {
+                        if child.is_none() {
+                            child = Some(self.create_array(field, &mut 
variadic_counts)?);
+                        }
+
+                        // Reuse the decoded array for duplicate projection 
entries.
+                        arrays.push((proj_idx, 
child.as_ref().unwrap().clone()));
+                    }
+                }
+
+                if child.is_none() {
                     self.skip_field(field, &mut variadic_counts)?;
                 }
             }
@@ -2297,6 +2306,30 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_projection_duplicate_indices() {
+        let schema = create_test_projection_schema();
+        let batch = create_test_projection_batch_data(&schema);
+
+        // Write the batch to IPC
+        let mut buf = Vec::new();
+        {
+            let mut writer = crate::writer::FileWriter::try_new(&mut buf, 
&schema).unwrap();
+            writer.write(&batch).unwrap();
+            writer.finish().unwrap();
+        }
+
+        // Verify duplicate([1, 1]) and reordered([2, 0, 2]) projection indices
+        for projection in [vec![1, 1], vec![2, 0, 2]] {
+            let reader =
+                FileReader::try_new(std::io::Cursor::new(buf.clone()), 
Some(projection.clone()));
+            let read_batch = reader.unwrap().next().unwrap().unwrap();
+
+            let expected_batch = batch.project(&projection).unwrap();
+            assert_eq!(read_batch, expected_batch);
+        }
+    }
+
     #[test]
     fn test_arrow_single_float_row() {
         let schema = Schema::new(vec![

Reply via email to