Copilot commented on code in PR #9369:
URL: https://github.com/apache/arrow-rs/pull/9369#discussion_r2773385749


##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
                         continue;
                     }
 
-                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
-
                     let read = 
self.array_reader.read_records(mask_chunk.chunk_rows)?;
                     if read == 0 {
                         return Err(general_err!(
                             "reached end of column while expecting {} rows",
                             mask_chunk.chunk_rows
                         ));
                     }
-                    if read != mask_chunk.chunk_rows {
-                        return Err(general_err!(
-                            "insufficient rows read from array reader - 
expected {}, got {}",
-                            mask_chunk.chunk_rows,
-                            read
-                        ));
-                    }
 
                     let array = self.array_reader.consume_batch()?;
-                    // The column reader exposes the projection as a struct 
array; convert this
-                    // into a record batch before applying the boolean filter 
mask.
                     let struct_array = array.as_struct_opt().ok_or_else(|| {
                         ArrowError::ParquetError(
                             "Struct array reader should return struct 
array".to_string(),
                         )
                     })?;
 
+                    // Key Change: partial read → emit immediately, no mask
+                    if read < mask_chunk.chunk_rows {
+                        return Ok(Some(RecordBatch::from(struct_array)));
+                    }
+
+                    // Full read , safe to apply mask

Review Comment:
   Useful explanatory comments were removed that described the high-level logic 
of the Mask cursor path. While the code changes are legitimate, removing 
documentation that explains "why" the code works a certain way reduces 
maintainability. Consider restoring or updating the comment that explains "The 
column reader exposes the projection as a struct array; convert this into a 
record batch before applying the boolean filter mask" near line 1405 where the 
conversion happens, as this helps future maintainers understand the data flow.
   ```suggestion
                       // The column reader exposes the projected columns as a 
single StructArray.
                       // Convert this struct array into a RecordBatch before 
applying the boolean
                       // filter mask so we can use the standard 
RecordBatch-level filter utilities.
                       // Key Change: partial read → emit immediately, no mask
                       if read < mask_chunk.chunk_rows {
                           return Ok(Some(RecordBatch::from(struct_array)));
                       }
   
                       // Full read, safe to apply mask
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
                         continue;
                     }
 
-                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
-
                     let read = 
self.array_reader.read_records(mask_chunk.chunk_rows)?;
                     if read == 0 {
                         return Err(general_err!(
                             "reached end of column while expecting {} rows",
                             mask_chunk.chunk_rows
                         ));
                     }
-                    if read != mask_chunk.chunk_rows {
-                        return Err(general_err!(
-                            "insufficient rows read from array reader - 
expected {}, got {}",
-                            mask_chunk.chunk_rows,
-                            read
-                        ));
-                    }
 
                     let array = self.array_reader.consume_batch()?;
-                    // The column reader exposes the projection as a struct 
array; convert this
-                    // into a record batch before applying the boolean filter 
mask.
                     let struct_array = array.as_struct_opt().ok_or_else(|| {
                         ArrowError::ParquetError(
                             "Struct array reader should return struct 
array".to_string(),
                         )
                     })?;
 
+                    // Key Change: partial read → emit immediately, no mask
+                    if read < mask_chunk.chunk_rows {
+                        return Ok(Some(RecordBatch::from(struct_array)));
+                    }
+
+                    // Full read , safe to apply mask

Review Comment:
   When a partial read occurs (read < mask_chunk.chunk_rows), this code returns 
the unfiltered struct_array without applying the boolean mask. This means rows 
that should have been filtered out according to the row selection mask will be 
incorrectly included in the returned RecordBatch. 
   
   The mask_cursor expects that mask_chunk.chunk_rows were read, but only 
'read' rows were actually decoded. The mask should still be applied, but it 
needs to be sized/sliced appropriately for the partial read, or the mask_cursor 
state needs to be adjusted to reflect only the rows that were actually read.
   ```suggestion
                       // Always apply the selection mask; for partial reads, 
slice it down to `read`
                       if read < mask_chunk.chunk_rows {
                           let full_mask = 
mask_cursor.mask_values_for(&mask_chunk)?;
                           let sliced_mask = full_mask.slice(0, read);
                           let sliced_mask = sliced_mask.as_boolean();
   
                           let filtered_batch =
                               
filter_record_batch(&RecordBatch::from(struct_array), sliced_mask)?;
   
                           if filtered_batch.num_rows() == 0 {
                               continue;
                           }
   
                           return Ok(Some(filtered_batch));
                       }
   
                       // Full read, apply mask directly
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1472,14 +1458,24 @@ impl ParquetRecordBatchReader {
                         }
                         _ => front.row_count,
                     };
-                    match self.array_reader.read_records(to_read)? {
-                        0 => break,
-                        rec => read_records += rec,
-                    };
+                    let rec = self.array_reader.read_records(to_read)?;
+                    if rec == 0 {
+                        break;
+                    }
+
+                    read_records += rec;
+
+                    // stop early if we couldn't read everything requested
+                    if rec < to_read {
+                        break;
+                    }
                 }
             }
             RowSelectionCursor::All => {
-                self.array_reader.read_records(batch_size)?;
+                let rec = self.array_reader.read_records(batch_size)?;
+                if rec == 0 {
+                    return Ok(None);

Review Comment:
   The three cursor paths handle the case where read_records returns 0 
inconsistently:
   - Mask path: Returns an error "reached end of column while expecting X rows" 
(lines 1397-1402)
   - Selectors path: Breaks the loop and proceeds to consume_batch (line 
1462-1463)
   - All path: Returns Ok(None) immediately without calling consume_batch 
(lines 1476-1478)
   
   This inconsistency could lead to different behavior in edge cases. Consider 
whether all three paths should handle EOF consistently, either by returning 
None or by proceeding to consume_batch to check if there's any buffered data. 
The early return in the All path might skip consuming any previously buffered 
data, although this may be intentional optimization.
   ```suggestion
                       break;
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1472,14 +1458,24 @@ impl ParquetRecordBatchReader {
                         }
                         _ => front.row_count,
                     };
-                    match self.array_reader.read_records(to_read)? {
-                        0 => break,
-                        rec => read_records += rec,
-                    };
+                    let rec = self.array_reader.read_records(to_read)?;
+                    if rec == 0 {
+                        break;
+                    }
+
+                    read_records += rec;
+
+                    // stop early if we couldn't read everything requested
+                    if rec < to_read {

Review Comment:
   When a partial read occurs (rec < to_read), this code breaks and returns a 
partial batch. However, if line 1456 executed, we've already pushed `remaining` 
rows back onto the selectors_cursor via return_selector(). This means those 
rows are queued for the next read, but we haven't actually advanced the cursor 
properly for the rows we DID read. The cursor state may become inconsistent 
because:
   1. If to_read was less than front.row_count, we pushed back `remaining` rows
   2. But we only read `rec` rows, which may be less than `to_read`
   3. The cursor doesn't know about the (to_read - rec) rows that weren't 
consumed
   
   Consider handling the partial read case by calculating how many rows were 
actually consumed and adjusting the selector cursor accordingly, possibly by 
calling return_selector with the unconsumed count.
   ```suggestion
                       if rec < to_read {
                           let unconsumed = to_read - rec;
                           if unconsumed > 0 {
                               
selectors_cursor.return_selector(RowSelector::select(unconsumed));
                           }
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
                         continue;
                     }
 
-                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
-
                     let read = 
self.array_reader.read_records(mask_chunk.chunk_rows)?;
                     if read == 0 {
                         return Err(general_err!(
                             "reached end of column while expecting {} rows",
                             mask_chunk.chunk_rows
                         ));
                     }
-                    if read != mask_chunk.chunk_rows {
-                        return Err(general_err!(
-                            "insufficient rows read from array reader - 
expected {}, got {}",
-                            mask_chunk.chunk_rows,
-                            read
-                        ));
-                    }
 
                     let array = self.array_reader.consume_batch()?;
-                    // The column reader exposes the projection as a struct 
array; convert this
-                    // into a record batch before applying the boolean filter 
mask.
                     let struct_array = array.as_struct_opt().ok_or_else(|| {
                         ArrowError::ParquetError(
                             "Struct array reader should return struct 
array".to_string(),
                         )
                     })?;
 
+                    // Key Change: partial read → emit immediately, no mask
+                    if read < mask_chunk.chunk_rows {
+                        return Ok(Some(RecordBatch::from(struct_array)));
+                    }
+
+                    // Full read , safe to apply mask

Review Comment:
   There's a spacing error in this comment: "Full read , safe" should be "Full 
read, safe" (remove the extra space before the comma).
   ```suggestion
                       // Full read, safe to apply mask
   ```



##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
                         continue;
                     }
 
-                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
-
                     let read = 
self.array_reader.read_records(mask_chunk.chunk_rows)?;
                     if read == 0 {
                         return Err(general_err!(
                             "reached end of column while expecting {} rows",
                             mask_chunk.chunk_rows
                         ));
                     }
-                    if read != mask_chunk.chunk_rows {
-                        return Err(general_err!(
-                            "insufficient rows read from array reader - 
expected {}, got {}",
-                            mask_chunk.chunk_rows,
-                            read
-                        ));
-                    }
 
                     let array = self.array_reader.consume_batch()?;
-                    // The column reader exposes the projection as a struct 
array; convert this
-                    // into a record batch before applying the boolean filter 
mask.
                     let struct_array = array.as_struct_opt().ok_or_else(|| {
                         ArrowError::ParquetError(
                             "Struct array reader should return struct 
array".to_string(),
                         )
                     })?;
 
+                    // Key Change: partial read → emit immediately, no mask
+                    if read < mask_chunk.chunk_rows {
+                        return Ok(Some(RecordBatch::from(struct_array)));
+                    }
+
+                    // Full read , safe to apply mask
+                    let mask = mask_cursor.mask_values_for(&mask_chunk)?;
                     let filtered_batch =
                         filter_record_batch(&RecordBatch::from(struct_array), 
&mask)?;
 

Review Comment:
   The removed validation that checked `filtered_batch.num_rows() == 
mask_chunk.selected_rows` was an important consistency check. While this check 
needs to be skipped for partial reads, it should still be performed for full 
reads to catch data inconsistencies. Consider restoring this check after line 
1419, within the full read path, to maintain data integrity validation.
   ```suggestion
   
                       // For full reads, ensure the mask and filtered output 
are consistent
                       if filtered_batch.num_rows() != mask_chunk.selected_rows 
{
                           return Err(general_err!(
                               "row filter inconsistency: expected {} rows, got 
{}",
                               mask_chunk.selected_rows,
                               filtered_batch.num_rows()
                           ));
                       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to