jecsand838 commented on code in PR #7834: URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2192861710
########## arrow-avro/src/reader/mod.rs: ########## @@ -28,145 +101,355 @@ mod header; mod record; mod vlq; -/// Configuration options for reading Avro data into Arrow arrays -/// -/// This struct contains configuration options that control how Avro data is -/// converted into Arrow arrays. It allows customizing various aspects of the -/// data conversion process. -/// -/// # Examples -/// -/// ``` -/// # use arrow_avro::reader::ReadOptions; -/// // Use default options (regular StringArray for strings) -/// let default_options = ReadOptions::default(); -/// -/// // Enable Utf8View support for better string performance -/// let options = ReadOptions::default() -/// .with_utf8view(true); -/// ``` -#[derive(Default, Debug, Clone)] -pub struct ReadOptions { - use_utf8view: bool, +/// Read the Avro file header (magic, metadata, sync marker) from `reader`. +fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> { + let mut decoder = HeaderDecoder::default(); + loop { + let buf = reader.fill_buf()?; + if buf.is_empty() { + break; + } + let read = buf.len(); + let decoded = decoder.decode(buf)?; + reader.consume(decoded); + if decoded != read { + break; + } + } + decoder.flush().ok_or_else(|| { + ArrowError::ParseError("Unexpected EOF while reading Avro header".to_string()) + }) +} + +/// A low-level interface for decoding Avro-encoded bytes into Arrow `RecordBatch`. +#[derive(Debug)] +pub struct Decoder { + record_decoder: RecordDecoder, + batch_size: usize, + decoded_rows: usize, +} + +impl Decoder { + fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self { + Self { + record_decoder, + batch_size, + decoded_rows: 0, + } + } + + /// Return the Arrow schema for the rows decoded by this decoder + pub fn schema(&self) -> SchemaRef { + self.record_decoder.schema().clone() + } + + /// Return the configured maximum number of rows per batch + pub fn batch_size(&self) -> usize { + self.batch_size + } + + /// Feed `data` into the decoder row by row until we either: + /// - consume all bytes in `data`, or + /// - reach `batch_size` decoded rows. + /// + /// Returns the number of bytes consumed. + pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> { + let mut total_consumed = 0usize; + while total_consumed < data.len() && self.decoded_rows < self.batch_size { + let consumed = self.record_decoder.decode(&data[total_consumed..], 1)?; + if consumed == 0 { + break; + } + total_consumed += consumed; + self.decoded_rows += 1; + } + Ok(total_consumed) + } + + /// Produce a `RecordBatch` if at least one row is fully decoded, returning + /// `Ok(None)` if no new rows are available. + pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> { + if self.decoded_rows == 0 { + Ok(None) + } else { + let batch = self.record_decoder.flush()?; + self.decoded_rows = 0; + Ok(Some(batch)) + } + } + + /// Returns the number of rows that can be added to this decoder before it is full. + pub fn capacity(&self) -> usize { + self.batch_size.saturating_sub(self.decoded_rows) + } + + /// Returns true if the decoder has reached its capacity for the current batch. + pub fn batch_is_full(&self) -> bool { + self.capacity() == 0 + } +} + +/// A builder to create an [`Avro Reader`](Reader) that reads Avro data +/// into Arrow `RecordBatch`. +#[derive(Debug)] +pub struct ReaderBuilder { + batch_size: usize, + strict_mode: bool, + utf8_view: bool, + schema: Option<AvroSchema<'static>>, +} + +impl Default for ReaderBuilder { + fn default() -> Self { + Self { + batch_size: 1024, + strict_mode: false, + utf8_view: false, + schema: None, + } + } } -impl ReadOptions { - /// Create a new `ReadOptions` with default values +impl ReaderBuilder { + /// Creates a new [`ReaderBuilder`] with default settings: + /// - `batch_size` = 1024 + /// - `strict_mode` = false + /// - `utf8_view` = false + /// - `schema` = None pub fn new() -> Self { Self::default() } + fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> Result<RecordDecoder, ArrowError> { + let root_field = AvroField::try_from(schema)?; + RecordDecoder::try_new_with_options( + root_field.data_type(), + self.utf8_view, + self.strict_mode, + ) + } + + fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, Decoder), ArrowError> { + let header = read_header(reader)?; + let record_decoder = if let Some(schema) = &self.schema { + self.make_record_decoder(schema)? + } else { + let avro_schema: Option<AvroSchema<'_>> = header + .schema() + .map_err(|e| ArrowError::ExternalError(Box::new(e)))?; + let avro_schema = avro_schema.ok_or_else(|| { + ArrowError::ParseError("No Avro schema present in file header".to_string()) + })?; + self.make_record_decoder(&avro_schema)? + }; + let decoder = Decoder::new(record_decoder, self.batch_size); + Ok((header, decoder)) + } + + /// Sets the row-based batch size + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + /// Set whether to use StringViewArray for string data /// /// When enabled, string data from Avro files will be loaded into /// Arrow's StringViewArray instead of the standard StringArray. - pub fn with_utf8view(mut self, use_utf8view: bool) -> Self { - self.use_utf8view = use_utf8view; + pub fn with_utf8_view(mut self, utf8_view: bool) -> Self { + self.utf8_view = utf8_view; self } /// Get whether StringViewArray is enabled for string data pub fn use_utf8view(&self) -> bool { - self.use_utf8view + self.utf8_view } -} -/// Read a [`Header`] from the provided [`BufRead`] -fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> { - let mut decoder = HeaderDecoder::default(); - loop { - let buf = reader.fill_buf()?; - if buf.is_empty() { - break; - } - let read = buf.len(); - let decoded = decoder.decode(buf)?; - reader.consume(decoded); - if decoded != read { - break; + /// Controls whether certain Avro unions of the form `[T, "null"]` should produce an error. + pub fn with_strict_mode(mut self, strict_mode: bool) -> Self { + self.strict_mode = strict_mode; + self + } + + /// Sets the Avro schema. + /// + /// If a schema is not provided, the schema will be read from the Avro file header. + pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self { + self.schema = Some(schema); + self + } + + /// Create a [`Reader`] from this builder and a `BufRead` + pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, ArrowError> { + let (header, decoder) = self.build_impl(&mut reader)?; + Ok(Reader { + reader, + header, + decoder, + block_decoder: BlockDecoder::default(), + block_data: Vec::new(), + block_cursor: 0, + finished: false, + }) + } + + /// Create a [`Decoder`] from this builder and a `BufRead` by + /// reading and parsing the Avro file's header. This will + /// not create a full [`Reader`]. + pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, ArrowError> { + match self.schema { + Some(ref schema) => { + let record_decoder = self.make_record_decoder(schema)?; + Ok(Decoder::new(record_decoder, self.batch_size)) Review Comment: @scovich > Ah, so the provided schema is taken at face value today, with no attempt to reconcile it against the file's actual schema? What happens if they disagree in matters large or small? Currently, that's correct and if there's a disagreement we should see a `ArrowError::ParseError`. I can add negative test cases to verify this and any fixes if needed to this PR. I do plan on adding full reader and writer schema support and resolution logic to handle disagreements in a follow-up PR, this logic will follow the specification: https://avro.apache.org/docs/1.11.1/specification/#schema-resolution > Meanwhile -- it sounds like whoever creates a decoder must read the header first and provide the schema. Because otherwise they lose access to the compression codec, which readers are apparently responsible to deal with rather than decoders. I can add a `with_compression` option to the `ReaderBuilder` as well. That's a good callout. I'm trying to ensure the foundation is in place to handle both Object-container files and raw binary encoding. Regarding the differences: * In the Object container files, you 100% require a schema row as the file header to ensure the file is self-describing. * In raw binary encoding, which is just a sequence of data blocks not wrapped in the container-file header, the schema isn't included in the data. The producer and consumer typically agree on the schema out of band. This is when you'll see an `.avsc` file with the schema or the schema (and compression) retrieved from a schema registry. > That said, I never quite understood why the decoder's flush method doesn't handle compression? I'm assuming you mean handling the compression logic in the `block_decoder` flush, i.e. `if let Some(block) = self.block_decoder.flush()`? My assumption was that we wanted to keep the compression logic loosely coupled with the block decoder. I don't necessarily see a reason we couldn't move it in though. CC: @tustvold -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org