tustvold commented on code in PR #2359:
URL: https://github.com/apache/arrow-rs/pull/2359#discussion_r939986090
##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -287,43 +287,59 @@ impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
- let to_read = match self.selection.as_mut() {
- Some(selection) => loop {
- let front = selection.pop_front()?;
- if front.skip {
- let skipped = match
self.array_reader.skip_records(front.row_count) {
- Ok(skipped) => skipped,
- Err(e) => return Some(Err(e.into())),
- };
-
- if skipped != front.row_count {
- return Some(Err(general_err!(
- "failed to skip rows, expected {}, got {}",
- front.row_count,
- skipped
- )
- .into()));
+ let mut read_records = 0;
+ match self.selection.as_mut() {
+ Some(selection) => {
+ while read_records < self.batch_size && !selection.is_empty() {
+ let front = selection.pop_front()?;
Review Comment:
```suggestion
let front = selection.pop_front().unwrap();
```
##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -287,43 +287,59 @@ impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
- let to_read = match self.selection.as_mut() {
- Some(selection) => loop {
- let front = selection.pop_front()?;
- if front.skip {
- let skipped = match
self.array_reader.skip_records(front.row_count) {
- Ok(skipped) => skipped,
- Err(e) => return Some(Err(e.into())),
- };
-
- if skipped != front.row_count {
- return Some(Err(general_err!(
- "failed to skip rows, expected {}, got {}",
- front.row_count,
- skipped
- )
- .into()));
+ let mut read_records = 0;
+ match self.selection.as_mut() {
+ Some(selection) => {
+ while read_records < self.batch_size && !selection.is_empty() {
+ let front = selection.pop_front()?;
+ if front.skip {
+ let skipped =
+ match
self.array_reader.skip_records(front.row_count) {
+ Ok(skipped) => skipped,
+ Err(e) => return Some(Err(e.into())),
+ };
+
+ if skipped != front.row_count {
+ return Some(Err(general_err!(
+ "failed to skip rows, expected {}, got {}",
+ front.row_count,
+ skipped
+ )
+ .into()));
+ }
+ continue;
}
- continue;
- }
- // try to read record
- let to_read = match
front.row_count.checked_sub(self.batch_size) {
- Some(remaining) if remaining != 0 => {
- // if page row count less than batch_size we must set
batch size to page row count.
- // add check avoid dead loop
- selection.push_front(RowSelection::select(remaining));
- self.batch_size
+ // try to read record
+ let need_read = self.batch_size - read_records;
+ let to_read = match front.row_count.checked_sub(need_read)
{
+ Some(remaining) if remaining != 0 => {
+ // if page row count less than batch_size we must
set batch size to page row count.
+ // add check avoid dead loop
+
selection.push_front(RowSelection::select(remaining));
+ need_read
+ }
+ _ => front.row_count,
+ };
+ match self.array_reader.read_records(to_read) {
+ Ok(rec) if rec == 0 =>
+ // no more data in file, the last batch in this reader
+ {
+ break
+ }
Review Comment:
```suggestion
Ok(0) => break,
```
##########
parquet/src/arrow/record_reader/mod.rs:
##########
@@ -198,12 +198,6 @@ where
self.num_records += buffered_records;
self.num_values += buffered_values;
- self.consume_def_levels();
Review Comment:
This makes sense to me
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]