jecsand838 commented on code in PR #7834: URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2193371587
########## arrow-avro/src/reader/mod.rs: ########## @@ -329,64 +334,41 @@ impl<R> Reader<R> { pub fn avro_header(&self) -> &Header { &self.header } -} -impl<R: BufRead> Reader<R> { /// Reads the next [`RecordBatch`] from the Avro file or `Ok(None)` on EOF fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> { - if self.finished { - return Ok(None); - } - loop { - if !self.block_data.is_empty() { - let consumed = self.decoder.decode(&self.block_data)?; - if consumed > 0 { - self.block_data.drain(..consumed); - } - match self.decoder.flush()? { - None => { - if !self.block_data.is_empty() { - break; - } - } - Some(batch) => { - return Ok(Some(batch)); - } - } - } - let maybe_block = { + 'outer: while !self.finished && !self.decoder.batch_is_full() { + while self.block_cursor == self.block_data.len() { let buf = self.reader.fill_buf()?; if buf.is_empty() { - None - } else { - let read_len = buf.len(); - let consumed_len = self.block_decoder.decode(buf)?; - self.reader.consume(consumed_len); - if consumed_len == 0 && read_len != 0 { - return Err(ArrowError::ParseError( - "Could not decode next Avro block from partial data".to_string(), - )); - } - self.block_decoder.flush() + self.finished = true; + break 'outer; } - }; - match maybe_block { - Some(block) => { - let block_data = if let Some(ref codec) = self.compression { + // Try to decode another block from the buffered reader. + let consumed = self.block_decoder.decode(buf)?; + self.reader.consume(consumed); + if let Some(block) = self.block_decoder.flush() { + // Successfully decoded a block. + let block_data = if let Some(ref codec) = self.header.compression()? { codec.decompress(&block.data)? } else { block.data }; self.block_data = block_data; + self.block_cursor = 0; + } else if consumed == 0 { + // The block decoder made no progress on a non-empty buffer. + return Err(ArrowError::ParseError( + "Could not decode next Avro block from partial data".to_string(), + )); } - None => { - self.finished = true; - if !self.block_data.is_empty() { - let consumed = self.decoder.decode(&self.block_data)?; - self.block_data.drain(..consumed); - } - return self.decoder.flush(); - } + } + // Try to decode more rows from the current block. + let consumed = self.decoder.decode(&self.block_data[self.block_cursor..])?; + if consumed == 0 && self.block_cursor < self.block_data.len() { + self.block_cursor = self.block_data.len(); Review Comment: @scovich 100% and the Avro spec can definitely be tricky and nuanced. > if there's no way to consume bytes after an empty record? I'd be curious how other implementations handle it? From my research the answer seems to be that no extra bytes exist after an “empty record” in Avro. It sounds like what the code should do is simply count that it has seen one record and move on. In the [Java implementation](https://github.com/apache/avro/blob/release-1.11.1/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java) `GenericDatumReader.readRecord` iterates over the field list; with zero fields the loop body never executes, so no calls are made and the input position stays where it is. The outer caller still decrements the [`blockRemaining`](https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L264) counter, so the `Decoder` read logic progresses correctly. > what happens if the reader provides a schema with fewer fields than the file's schema? If the schema used to write the data (in this instance the file's schema) cannot be resolved with the reader's schema per the [avro specification's schema resolution requirements](https://avro.apache.org/docs/1.11.1/specification/#schema-resolution) then we should return an error for every record written using that invalid schema imo. Handling the error would then be up to the caller. In the scenario you called out, the missing fields would be ignored and no error returned per the specification: `if the writer’s record contains a field with a name not present in the reader’s record, the writer’s value for that field is ignored.` (Remember that an Avro schema is just a Record type with the fields of each row defined as Avro Record Fields in the schema Record.) > Is the data layout self-describing enough that the decoder can reliably detect the extra bytes and either skip them or blow up? It should be imo. From looking over the Java implementation it seems the writer schema either needs to be: 1. Explicitly provided: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L122 2. Provided in the first row (Header) of the stream: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/file/DataFileStream.java#L142 Or a `MissingSchemaException` is thrown: https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java#L42 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org