jecsand838 commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2193371587


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
     pub fn avro_header(&self) -> &Header {
         &self.header
     }
-}
 
-impl<R: BufRead> Reader<R> {
     /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
     fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.finished {
-            return Ok(None);
-        }
-        loop {
-            if !self.block_data.is_empty() {
-                let consumed = self.decoder.decode(&self.block_data)?;
-                if consumed > 0 {
-                    self.block_data.drain(..consumed);
-                }
-                match self.decoder.flush()? {
-                    None => {
-                        if !self.block_data.is_empty() {
-                            break;
-                        }
-                    }
-                    Some(batch) => {
-                        return Ok(Some(batch));
-                    }
-                }
-            }
-            let maybe_block = {
+        'outer: while !self.finished && !self.decoder.batch_is_full() {
+            while self.block_cursor == self.block_data.len() {
                 let buf = self.reader.fill_buf()?;
                 if buf.is_empty() {
-                    None
-                } else {
-                    let read_len = buf.len();
-                    let consumed_len = self.block_decoder.decode(buf)?;
-                    self.reader.consume(consumed_len);
-                    if consumed_len == 0 && read_len != 0 {
-                        return Err(ArrowError::ParseError(
-                            "Could not decode next Avro block from partial 
data".to_string(),
-                        ));
-                    }
-                    self.block_decoder.flush()
+                    self.finished = true;
+                    break 'outer;
                 }
-            };
-            match maybe_block {
-                Some(block) => {
-                    let block_data = if let Some(ref codec) = self.compression 
{
+                // Try to decode another block from the buffered reader.
+                let consumed = self.block_decoder.decode(buf)?;
+                self.reader.consume(consumed);
+                if let Some(block) = self.block_decoder.flush() {
+                    // Successfully decoded a block.
+                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
                         codec.decompress(&block.data)?
                     } else {
                         block.data
                     };
                     self.block_data = block_data;
+                    self.block_cursor = 0;
+                } else if consumed == 0 {
+                    // The block decoder made no progress on a non-empty 
buffer.
+                    return Err(ArrowError::ParseError(
+                        "Could not decode next Avro block from partial 
data".to_string(),
+                    ));
                 }
-                None => {
-                    self.finished = true;
-                    if !self.block_data.is_empty() {
-                        let consumed = self.decoder.decode(&self.block_data)?;
-                        self.block_data.drain(..consumed);
-                    }
-                    return self.decoder.flush();
-                }
+            }
+            // Try to decode more rows from the current block.
+            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+            if consumed == 0 && self.block_cursor < self.block_data.len() {
+                self.block_cursor = self.block_data.len();

Review Comment:
   @scovich 100% and the Avro spec can definitely be tricky and nuanced.
   
   > if there's no way to consume bytes after an empty record? I'd be curious 
how other implementations handle it?
   
   From my research the answer seems to be that no extra bytes exist after an 
“empty record” in Avro. It seems like what the code should do is simply count 
that it has seen one record and move on.
   
   In the [Java 
implementation](https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java)
 `GenericDatumReader.readRecord` iterates over the field list; with zero fields 
the loop body never executes, so no calls are made and the input position stays 
where it is. The outer caller still decrements the 
[`blockRemaining`](https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L264)
 counter, so the `Decoder` read logic progresses correctly. 
   
   > what happens if the reader provides a schema with fewer fields than the 
file's schema?
   
   If the schema used to write the data (in this instance the file's schema) 
cannot be resolved with the reader's schema per the [avro specification's 
schema resolution 
requirements](https://avro.apache.org/docs/1.11.1/specification/#schema-resolution)
 then we should return an error for every record written using that invalid 
schema imo. Handling the error would then be up to the caller. In the scenario 
you called out, the missing fields would be ignored and no error returned per 
the specification: `if the writer’s record contains a field with a name not 
present in the reader’s record, the writer’s value for that field is ignored.` 
(Remember that an Avro schema is just a Record type with the fields of each row 
defined as Avro Record Fields in the schema Record.)
   
   > Is the data layout self-describing enough that the decoder can reliably 
detect the extra bytes and either skip them or blow up?
   
   It should be imo. From looking over the Java implementation it seems the 
writer schema either needs to be:
   1. Explicitly provided: 
https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L122
   2. Provided in the first row (Header) of the stream: 
https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L142
   
   Or a `MissingSchemaException` is thrown: 
https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L42
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to