jecsand838 commented on code in PR #7834:
URL: https://github.com/apache/arrow-rs/pull/7834#discussion_r2193135966


##########
arrow-avro/src/reader/mod.rs:
##########
@@ -28,145 +101,355 @@ mod header;
 mod record;
 mod vlq;
 
-/// Configuration options for reading Avro data into Arrow arrays
-///
-/// This struct contains configuration options that control how Avro data is
-/// converted into Arrow arrays. It allows customizing various aspects of the
-/// data conversion process.
-///
-/// # Examples
-///
-/// ```
-/// # use arrow_avro::reader::ReadOptions;
-/// // Use default options (regular StringArray for strings)
-/// let default_options = ReadOptions::default();
-///
-/// // Enable Utf8View support for better string performance
-/// let options = ReadOptions::default()
-///     .with_utf8view(true);
-/// ```
-#[derive(Default, Debug, Clone)]
-pub struct ReadOptions {
-    use_utf8view: bool,
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
+    let mut decoder = HeaderDecoder::default();
+    loop {
+        let buf = reader.fill_buf()?;
+        if buf.is_empty() {
+            break;
+        }
+        let read = buf.len();
+        let decoded = decoder.decode(buf)?;
+        reader.consume(decoded);
+        if decoded != read {
+            break;
+        }
+    }
+    decoder.flush().ok_or_else(|| {
+        ArrowError::ParseError("Unexpected EOF while reading Avro 
header".to_string())
+    })
+}
+
+/// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
+#[derive(Debug)]
+pub struct Decoder {
+    record_decoder: RecordDecoder,
+    batch_size: usize,
+    decoded_rows: usize,
+}
+
+impl Decoder {
+    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
+        Self {
+            record_decoder,
+            batch_size,
+            decoded_rows: 0,
+        }
+    }
+
+    /// Return the Arrow schema for the rows decoded by this decoder
+    pub fn schema(&self) -> SchemaRef {
+        self.record_decoder.schema().clone()
+    }
+
+    /// Return the configured maximum number of rows per batch
+    pub fn batch_size(&self) -> usize {
+        self.batch_size
+    }
+
+    /// Feed `data` into the decoder row by row until we either:
+    /// - consume all bytes in `data`, or
+    /// - reach `batch_size` decoded rows.
+    ///
+    /// Returns the number of bytes consumed.
+    pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        let mut total_consumed = 0usize;
+        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
+            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
+            if consumed == 0 {
+                break;
+            }
+            total_consumed += consumed;
+            self.decoded_rows += 1;
+        }
+        Ok(total_consumed)
+    }
+
+    /// Produce a `RecordBatch` if at least one row is fully decoded, returning
+    /// `Ok(None)` if no new rows are available.
+    pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
+        if self.decoded_rows == 0 {
+            Ok(None)
+        } else {
+            let batch = self.record_decoder.flush()?;
+            self.decoded_rows = 0;
+            Ok(Some(batch))
+        }
+    }
+
+    /// Returns the number of rows that can be added to this decoder before it 
is full.
+    pub fn capacity(&self) -> usize {
+        self.batch_size.saturating_sub(self.decoded_rows)
+    }
+
+    /// Returns true if the decoder has reached its capacity for the current 
batch.
+    pub fn batch_is_full(&self) -> bool {
+        self.capacity() == 0
+    }
+}
+
+/// A builder to create an [`Avro Reader`](Reader) that reads Avro data
+/// into Arrow `RecordBatch`.
+#[derive(Debug)]
+pub struct ReaderBuilder {
+    batch_size: usize,
+    strict_mode: bool,
+    utf8_view: bool,
+    schema: Option<AvroSchema<'static>>,
+}
+
+impl Default for ReaderBuilder {
+    fn default() -> Self {
+        Self {
+            batch_size: 1024,
+            strict_mode: false,
+            utf8_view: false,
+            schema: None,
+        }
+    }
 }
 
-impl ReadOptions {
-    /// Create a new `ReadOptions` with default values
+impl ReaderBuilder {
+    /// Creates a new [`ReaderBuilder`] with default settings:
+    /// - `batch_size` = 1024
+    /// - `strict_mode` = false
+    /// - `utf8_view` = false
+    /// - `schema` = None
     pub fn new() -> Self {
         Self::default()
     }
 
+    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
+        let root_field = AvroField::try_from(schema)?;
+        RecordDecoder::try_new_with_options(
+            root_field.data_type(),
+            self.utf8_view,
+            self.strict_mode,
+        )
+    }
+
+    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
+        let header = read_header(reader)?;
+        let record_decoder = if let Some(schema) = &self.schema {
+            self.make_record_decoder(schema)?
+        } else {
+            let avro_schema: Option<AvroSchema<'_>> = header
+                .schema()
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
+            let avro_schema = avro_schema.ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+            })?;
+            self.make_record_decoder(&avro_schema)?
+        };
+        let decoder = Decoder::new(record_decoder, self.batch_size);
+        Ok((header, decoder))
+    }
+
+    /// Sets the row-based batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
     /// Set whether to use StringViewArray for string data
     ///
     /// When enabled, string data from Avro files will be loaded into
     /// Arrow's StringViewArray instead of the standard StringArray.
-    pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
-        self.use_utf8view = use_utf8view;
+    pub fn with_utf8_view(mut self, utf8_view: bool) -> Self {
+        self.utf8_view = utf8_view;
         self
     }
 
     /// Get whether StringViewArray is enabled for string data
     pub fn use_utf8view(&self) -> bool {
-        self.use_utf8view
+        self.utf8_view
     }
-}
 
-/// Read a [`Header`] from the provided [`BufRead`]
-fn read_header<R: BufRead>(mut reader: R) -> Result<Header, ArrowError> {
-    let mut decoder = HeaderDecoder::default();
-    loop {
-        let buf = reader.fill_buf()?;
-        if buf.is_empty() {
-            break;
-        }
-        let read = buf.len();
-        let decoded = decoder.decode(buf)?;
-        reader.consume(decoded);
-        if decoded != read {
-            break;
+    /// Controls whether certain Avro unions of the form `[T, "null"]` should 
produce an error.
+    pub fn with_strict_mode(mut self, strict_mode: bool) -> Self {
+        self.strict_mode = strict_mode;
+        self
+    }
+
+    /// Sets the Avro schema.
+    ///
+    /// If a schema is not provided, the schema will be read from the Avro 
file header.
+    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
+        self.schema = Some(schema);
+        self
+    }
+
+    /// Create a [`Reader`] from this builder and a `BufRead`
+    pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
+        let (header, decoder) = self.build_impl(&mut reader)?;
+        Ok(Reader {
+            reader,
+            header,
+            decoder,
+            block_decoder: BlockDecoder::default(),
+            block_data: Vec::new(),
+            block_cursor: 0,
+            finished: false,
+        })
+    }
+
+    /// Create a [`Decoder`] from this builder and a `BufRead` by
+    /// reading and parsing the Avro file's header. This will
+    /// not create a full [`Reader`].
+    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
+        match self.schema {
+            Some(ref schema) => {
+                let record_decoder = self.make_record_decoder(schema)?;
+                Ok(Decoder::new(record_decoder, self.batch_size))

Review Comment:
   @scovich 
   
   >  It's just weird that it's so easy to "wedge" yourself by not providing an 
explicit schema, which consumes the header in order to infer one, which makes 
it impossible to determine the compression codec you should use without 
re-reading the header.
   
   When dealing with object container file format, i.e. your standard `.avro` 
files, the header has to be there and all records must match the schema of the 
header: 
https://avro.apache.org/docs/1.11.1/specification/#object-container-files. In 
the event `with_schema` is used AND we're reading from an object container 
file, the schema from `with_schema` would be the reader schema and the schema 
from the file's header would be the writer schema. In that event or more 
generally whenever we have different, but compatible per the specification, 
reader and writer schemas, part of decoding each row will involve mapping from 
the initial schema used to write the data to the format the data must be in to 
read into Arrow essentially.
   
   To be clear, that logic is not currently in place because we aren't ready to 
implement schema resolution just yet, so some of the current code, such as:
   ```rust
           match self.schema {
               Some(ref schema) => {
                   let record_decoder = self.make_record_decoder(schema)?;
                   Ok(Decoder::new(record_decoder, self.batch_size))
               }
               None => {
                   let (_, decoder) = self.build_impl(&mut reader)?;
                   Ok(decoder)
               }
           }
   ```
   is place holder logic.
   
   Also I'll add that a compliant and mature Avro decoder is expected to handle 
schema resolution. It's one of the most powerful and fundamental features of 
the Avro format. It's certainly one of the biggest reason's I'm working on this 
contribution and not using another format. So we will definitely want it 
completed before marking the `arrow-avro` reader as public. I just need to 
finish and merge in the PRs for field metadata, "out of spec" impala support, 
and the remaining types, such as `Duration` first.
   
   > a new_with_metadata constructor that allows the caller to provide the 
parquet footer, and also has a metadata method that allows the caller to access 
the parquet footer (regardless of whether it was provided or read internally)
   
   That's not a bad idea tbh. In scenarios where we are decoding streaming 
blocks (not files), maybe we can use a function like that to inform the 
writer's schema while the `with_schema` method still provides the reader's 
schema. From there maybe we can even support the caller changing the writer's 
schema as the streaming data comes in. Usually this scenario will arise when 
decoding rows coming in via protocol wire format: 
https://avro.apache.org/docs/1.11.1/specification/#protocol-wire-format. I 
don't think the decoder should support implicitly inferring a schema to be 
clear. It should be the responsibility of the caller to identify when the 
writer schema has changed and to call a method that updates the decoder imo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to