scovich commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2193005292


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -329,64 +334,41 @@ impl<R> Reader<R> {
     pub fn avro_header(&self) -> &Header {
         &self.header
     }
-}
 
-impl<R: BufRead> Reader<R> {
     /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF
     fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.finished {
-            return Ok(None);
-        }
-        loop {
-            if !self.block_data.is_empty() {
-                let consumed = self.decoder.decode(&self.block_data)?;
-                if consumed > 0 {
-                    self.block_data.drain(..consumed);
-                }
-                match self.decoder.flush()? {
-                    None => {
-                        if !self.block_data.is_empty() {
-                            break;
-                        }
-                    }
-                    Some(batch) => {
-                        return Ok(Some(batch));
-                    }
-                }
-            }
-            let maybe_block = {
+        'outer: while !self.finished && !self.decoder.batch_is_full() {
+            while self.block_cursor == self.block_data.len() {
                 let buf = self.reader.fill_buf()?;
                 if buf.is_empty() {
-                    None
-                } else {
-                    let read_len = buf.len();
-                    let consumed_len = self.block_decoder.decode(buf)?;
-                    self.reader.consume(consumed_len);
-                    if consumed_len == 0 && read_len != 0 {
-                        return Err(ArrowError::ParseError(
-                            "Could not decode next Avro block from partial 
data".to_string(),
-                        ));
-                    }
-                    self.block_decoder.flush()
+                    self.finished = true;
+                    break 'outer;
                 }
-            };
-            match maybe_block {
-                Some(block) => {
-                    let block_data = if let Some(ref codec) = self.compression 
{
+                // Try to decode another block from the buffered reader.
+                let consumed = self.block_decoder.decode(buf)?;
+                self.reader.consume(consumed);
+                if let Some(block) = self.block_decoder.flush() {
+                    // Successfully decoded a block.
+                    let block_data = if let Some(ref codec) = 
self.header.compression()? {
                         codec.decompress(&block.data)?
                     } else {
                         block.data
                     };
                     self.block_data = block_data;
+                    self.block_cursor = 0;
+                } else if consumed == 0 {
+                    // The block decoder made no progress on a non-empty 
buffer.
+                    return Err(ArrowError::ParseError(
+                        "Could not decode next Avro block from partial 
data".to_string(),
+                    ));
                 }
-                None => {
-                    self.finished = true;
-                    if !self.block_data.is_empty() {
-                        let consumed = self.decoder.decode(&self.block_data)?;
-                        self.block_data.drain(..consumed);
-                    }
-                    return self.decoder.flush();
-                }
+            }
+            // Try to decode more rows from the current block.
+            let consumed = 
self.decoder.decode(&self.block_data[self.block_cursor..])?;
+            if consumed == 0 && self.block_cursor < self.block_data.len() {
+                self.block_cursor = self.block_data.len();

Review Comment:
   > The reader "detects" a zero byte record not by reading bytes from the data 
stream, but by interpreting the Avro schema it was given. The schema would look 
something like this: `{"type": "record", "name": "EmptyRecord", "fields": []}`.
   
   @jecsand838 thanks a lot for the context. This almost sounds like a flaw in 
the avro spec, if there's no way to consume bytes after an empty record? I'd be 
curious how other implementations handle it?
   
   Also: what happens if the reader provides a schema with fewer fields than 
the file's schema? Is the data layout self-describing enough that the decoder 
can reliably detect the extra bytes and either skip them or blow up?
   
   Asking because (according to my not-super-trustworthy AI coding assistant) 
there seem to be several cases here:
   1. The reader asked for an empty schema, even tho the file contains data. 
       * This should either blow up due to schema mismatch or (more likely) 
just return however many empty rows the block contained. 
       * It is correct to ignore all bytes in the block, because we don't want 
the columns they encode
       * But wouldn't the decoder have anyway consumed them as part of handling 
extra/unwanted columns in general?
   2. The writer produced an empty schema. 
       * This should return however many empty rows the block contains
       * The block should _not_ contain any bytes, because there was nothing to 
encode
       * Seems like we should blow up if spurious bytes are present, unless the 
avro spec says otherwise?
   3. [AI assistant claim] Nulls occupy zero bytes, and so an all-null row 
would occupy zero bytes even if the schema is non-empty. 
       * I question this one because there has to be _some_ way of knowing the 
row (or its values) are all-NULL, and that information would have to come from 
the byte stream the decoder is consuming?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to