scovich commented on code in PR #8100:
URL: https://github.com/apache/arrow-rs/pull/8100#discussion_r2266533676
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -173,36 +173,29 @@ impl Decoder {
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
let mut total_consumed = 0usize;
while total_consumed < data.len() && self.remaining_capacity > 0 {
- if !self.awaiting_body {
- if let Some(n) = self.handle_prefix(&data[total_consumed..])? {
- if n == 0 {
- break;
- }
- total_consumed += n;
- self.awaiting_body = true;
- self.apply_pending_schema_if_batch_empty();
- if self.remaining_capacity == 0 {
- break;
+ if self.awaiting_body {
+ match self.active_decoder.decode(&data[total_consumed..], 1) {
+ Ok(n) => {
+ self.remaining_capacity -= 1;
+ total_consumed += n;
+ self.awaiting_body = false;
+ continue;
}
- }
+ Err(ref e) if is_incomplete_data(e) => return
Ok(total_consumed),
Review Comment:
```suggestion
Err(ref e) if is_incomplete_data(e) => break,
```
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -130,6 +129,16 @@ fn read_header<R: BufRead>(mut reader: R) ->
Result<Header, ArrowError> {
})
}
+fn is_incomplete_data(err: &ArrowError) -> bool {
+ matches!(
+ err,
+ ArrowError::ParseError(msg)
+ if msg.contains("Unexpected EOF")
+ || msg.contains("bad varint")
+ || msg.contains("offset overflow")
Review Comment:
Hmm, after thinking more about this over the weekend --
Trying to interpret/suppress these errors will almost certainly make the
decoder brittle in the face of malformed input bytes that legitimately trigger
these errors. For example, we could put the decoder in an infinite loop where
it keeps trying to fetch more and more bytes in hopes of eliminating the error,
when the error is fully contained in the existing buffer.
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -270,6 +271,24 @@ impl Decoder {
self.active_decoder = new_decoder;
}
}
+ }
+
+ fn apply_pending_schema_if_batch_empty(&mut self) {
+ if self.remaining_capacity != self.batch_size {
+ return;
+ }
+ self.apply_pending_schema();
+ }
+
+ /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+ /// `Ok(None)` if no new rows are available.
+ pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+ if self.remaining_capacity == self.batch_size {
+ return Ok(None);
+ }
+ let batch = self.active_decoder.flush()?;
+ self.remaining_capacity = self.batch_size;
+ self.apply_pending_schema();
Review Comment:
`flush` and `flush_block` are identical except this call to
`self.apply_pending_schema`?
Is there a way to deduplicate the code? Maybe a `flush_internal` that takes
a boolean argument (which the compiler would aggressively inline away as if it
were a generic parameter)?
Or just call `self.apply_pending_schema` unconditionally, knowing it should
be a no-op during block decoding because `self.pending_schema` is always None?
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -270,6 +271,24 @@ impl Decoder {
self.active_decoder = new_decoder;
}
}
+ }
+
+ fn apply_pending_schema_if_batch_empty(&mut self) {
+ if self.remaining_capacity != self.batch_size {
+ return;
+ }
+ self.apply_pending_schema();
Review Comment:
Also, we have quite a few places that could benefit from a small helper
method:
```rust
fn batch_is_empty(&self) -> bool {
self.remaining_capacity == self.batch_size
}
```
... which could improve readability, e.g.
```suggestion
if self.batch_is_empty() {
self.apply_pending_schema();
}
```
##########
arrow-avro/src/reader/mod.rs:
##########
@@ -270,6 +271,24 @@ impl Decoder {
self.active_decoder = new_decoder;
}
}
+ }
+
+ fn apply_pending_schema_if_batch_empty(&mut self) {
+ if self.remaining_capacity != self.batch_size {
+ return;
+ }
+ self.apply_pending_schema();
Review Comment:
```suggestion
if self.remaining_capacity == self.batch_size {
self.apply_pending_schema();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]