Copilot commented on code in PR #9369:
URL: https://github.com/apache/arrow-rs/pull/9369#discussion_r2773385749
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
continue;
}
- let mask = mask_cursor.mask_values_for(&mask_chunk)?;
-
let read =
self.array_reader.read_records(mask_chunk.chunk_rows)?;
if read == 0 {
return Err(general_err!(
"reached end of column while expecting {} rows",
mask_chunk.chunk_rows
));
}
- if read != mask_chunk.chunk_rows {
- return Err(general_err!(
- "insufficient rows read from array reader -
expected {}, got {}",
- mask_chunk.chunk_rows,
- read
- ));
- }
let array = self.array_reader.consume_batch()?;
- // The column reader exposes the projection as a struct
array; convert this
- // into a record batch before applying the boolean filter
mask.
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct
array".to_string(),
)
})?;
+ // Key Change: partial read → emit immediately, no mask
+ if read < mask_chunk.chunk_rows {
+ return Ok(Some(RecordBatch::from(struct_array)));
+ }
+
+ // Full read , safe to apply mask
Review Comment:
Useful explanatory comments were removed that described the high-level logic
of the Mask cursor path. While the code changes are legitimate, removing
documentation that explains "why" the code works a certain way reduces
maintainability. Consider restoring or updating the comment that explains "The
column reader exposes the projection as a struct array; convert this into a
record batch before applying the boolean filter mask" near line 1405 where the
conversion happens, as this helps future maintainers understand the data flow.
```suggestion
// The column reader exposes the projected columns as a
single StructArray.
// Convert this struct array into a RecordBatch before
applying the boolean
// filter mask so we can use the standard
RecordBatch-level filter utilities.
// Key Change: partial read → emit immediately, no mask
if read < mask_chunk.chunk_rows {
return Ok(Some(RecordBatch::from(struct_array)));
}
// Full read, safe to apply mask
```
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
continue;
}
- let mask = mask_cursor.mask_values_for(&mask_chunk)?;
-
let read =
self.array_reader.read_records(mask_chunk.chunk_rows)?;
if read == 0 {
return Err(general_err!(
"reached end of column while expecting {} rows",
mask_chunk.chunk_rows
));
}
- if read != mask_chunk.chunk_rows {
- return Err(general_err!(
- "insufficient rows read from array reader -
expected {}, got {}",
- mask_chunk.chunk_rows,
- read
- ));
- }
let array = self.array_reader.consume_batch()?;
- // The column reader exposes the projection as a struct
array; convert this
- // into a record batch before applying the boolean filter
mask.
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct
array".to_string(),
)
})?;
+ // Key Change: partial read → emit immediately, no mask
+ if read < mask_chunk.chunk_rows {
+ return Ok(Some(RecordBatch::from(struct_array)));
+ }
+
+ // Full read , safe to apply mask
Review Comment:
When a partial read occurs (read < mask_chunk.chunk_rows), this code returns
the unfiltered struct_array without applying the boolean mask. This means rows
that should have been filtered out according to the row selection mask will be
incorrectly included in the returned RecordBatch.
The mask_cursor expects that mask_chunk.chunk_rows were read, but only
'read' rows were actually decoded. The mask should still be applied, but it
needs to be sized/sliced appropriately for the partial read, or the mask_cursor
state needs to be adjusted to reflect only the rows that were actually read.
```suggestion
// Always apply the selection mask; for partial reads,
slice it down to `read`
if read < mask_chunk.chunk_rows {
let full_mask =
mask_cursor.mask_values_for(&mask_chunk)?;
let sliced_mask = full_mask.slice(0, read);
let sliced_mask = sliced_mask.as_boolean();
let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array), sliced_mask)?;
if filtered_batch.num_rows() == 0 {
continue;
}
return Ok(Some(filtered_batch));
}
// Full read, apply mask directly
```
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1472,14 +1458,24 @@ impl ParquetRecordBatchReader {
}
_ => front.row_count,
};
- match self.array_reader.read_records(to_read)? {
- 0 => break,
- rec => read_records += rec,
- };
+ let rec = self.array_reader.read_records(to_read)?;
+ if rec == 0 {
+ break;
+ }
+
+ read_records += rec;
+
+ // stop early if we couldn't read everything requested
+ if rec < to_read {
+ break;
+ }
}
}
RowSelectionCursor::All => {
- self.array_reader.read_records(batch_size)?;
+ let rec = self.array_reader.read_records(batch_size)?;
+ if rec == 0 {
+ return Ok(None);
Review Comment:
The three cursor paths handle the case where read_records returns 0
inconsistently:
- Mask path: Returns an error "reached end of column while expecting X rows"
(lines 1397-1402)
- Selectors path: Breaks the loop and proceeds to consume_batch (line
1462-1463)
- All path: Returns Ok(None) immediately without calling consume_batch
(lines 1476-1478)
This inconsistency could lead to different behavior in edge cases. Consider
whether all three paths should handle EOF consistently, either by returning
None or by proceeding to consume_batch to check if there's any buffered data.
The early return in the All path might skip consuming any previously buffered
data, although this may be intentional optimization.
```suggestion
break;
```
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1472,14 +1458,24 @@ impl ParquetRecordBatchReader {
}
_ => front.row_count,
};
- match self.array_reader.read_records(to_read)? {
- 0 => break,
- rec => read_records += rec,
- };
+ let rec = self.array_reader.read_records(to_read)?;
+ if rec == 0 {
+ break;
+ }
+
+ read_records += rec;
+
+ // stop early if we couldn't read everything requested
+ if rec < to_read {
Review Comment:
When a partial read occurs (rec < to_read), this code breaks and returns a
partial batch. However, if line 1456 executed, we've already pushed `remaining`
rows back onto the selectors_cursor via return_selector(). This means those
rows are queued for the next read, but we haven't actually advanced the cursor
properly for the rows we DID read. The cursor state may become inconsistent
because:
1. If to_read was less than front.row_count, we pushed back `remaining` rows
2. But we only read `rec` rows, which may be less than `to_read`
3. The cursor doesn't know about the (to_read - rec) rows that weren't
consumed
Consider handling the partial read case by calculating how many rows were
actually consumed and adjusting the selector cursor accordingly, possibly by
calling return_selector with the unconsumed count.
```suggestion
if rec < to_read {
let unconsumed = to_read - rec;
if unconsumed > 0 {
selectors_cursor.return_selector(RowSelector::select(unconsumed));
}
```
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
continue;
}
- let mask = mask_cursor.mask_values_for(&mask_chunk)?;
-
let read =
self.array_reader.read_records(mask_chunk.chunk_rows)?;
if read == 0 {
return Err(general_err!(
"reached end of column while expecting {} rows",
mask_chunk.chunk_rows
));
}
- if read != mask_chunk.chunk_rows {
- return Err(general_err!(
- "insufficient rows read from array reader -
expected {}, got {}",
- mask_chunk.chunk_rows,
- read
- ));
- }
let array = self.array_reader.consume_batch()?;
- // The column reader exposes the projection as a struct
array; convert this
- // into a record batch before applying the boolean filter
mask.
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct
array".to_string(),
)
})?;
+ // Key Change: partial read → emit immediately, no mask
+ if read < mask_chunk.chunk_rows {
+ return Ok(Some(RecordBatch::from(struct_array)));
+ }
+
+ // Full read , safe to apply mask
Review Comment:
There's a spacing error in this comment: "Full read , safe" should be "Full
read, safe" (remove the extra space before the comma).
```suggestion
// Full read, safe to apply mask
```
##########
parquet/src/arrow/arrow_reader/mod.rs:
##########
@@ -1395,43 +1393,31 @@ impl ParquetRecordBatchReader {
continue;
}
- let mask = mask_cursor.mask_values_for(&mask_chunk)?;
-
let read =
self.array_reader.read_records(mask_chunk.chunk_rows)?;
if read == 0 {
return Err(general_err!(
"reached end of column while expecting {} rows",
mask_chunk.chunk_rows
));
}
- if read != mask_chunk.chunk_rows {
- return Err(general_err!(
- "insufficient rows read from array reader -
expected {}, got {}",
- mask_chunk.chunk_rows,
- read
- ));
- }
let array = self.array_reader.consume_batch()?;
- // The column reader exposes the projection as a struct
array; convert this
- // into a record batch before applying the boolean filter
mask.
let struct_array = array.as_struct_opt().ok_or_else(|| {
ArrowError::ParquetError(
"Struct array reader should return struct
array".to_string(),
)
})?;
+ // Key Change: partial read → emit immediately, no mask
+ if read < mask_chunk.chunk_rows {
+ return Ok(Some(RecordBatch::from(struct_array)));
+ }
+
+ // Full read , safe to apply mask
+ let mask = mask_cursor.mask_values_for(&mask_chunk)?;
let filtered_batch =
filter_record_batch(&RecordBatch::from(struct_array),
&mask)?;
Review Comment:
The removed validation that checked `filtered_batch.num_rows() ==
mask_chunk.selected_rows` was an important consistency check. While this check
needs to be skipped for partial reads, it should still be performed for full
reads to catch data inconsistencies. Consider restoring this check after line
1419, within the full read path, to maintain data integrity validation.
```suggestion
// For full reads, ensure the mask and filtered output
are consistent
if filtered_batch.num_rows() != mask_chunk.selected_rows
{
return Err(general_err!(
"row filter inconsistency: expected {} rows, got
{}",
mask_chunk.selected_rows,
filtered_batch.num_rows()
));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]